#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Functions supporting radiation belt diffusion codes
Authors: Josef Koller
Institution: Los Alamos National Laboratory
Contact: jkoller@lanl.gov
Copyright 2010 Los Alamos National Security, LLC.
"""
import ctypes
import datetime
from spacepy import help
import numpy as np
import spacepy.time as st
import pdb
__contact__ = 'Josef Koller, jkoller@lanl.gov'
# -----------------------------------------------
# RBmodel class
# -----------------------------------------------
[docs]class RBmodel(object):
"""1-D Radial diffusion class
This module contains a class for performing and visualizing 1-D radial
diffusion simulations of the radiation belts.
Here is an example using the default settings of the model.
Each instance must be initialized with (assuming import radbelt as rb):
>>> rmod = rb.RBmodel()
Next, set the start time, end time, and the size of the timestep:
>>> import datetime
>>> start = datetime.datetime(2003,10,14)
>>> end = datetime.datetime(2003,12,26)
>>> delta = datetime.timedelta(hours=1)
>>> rmod.setup_ticks(start, end, delta, dtype='UTC')
Now, run the model over the entire time range using the evolve method:
>>> rmod.evolve()
Finally, visualize the results:
>>> rmod.plot_summary()
.. currentmodule:: spacepy.radbelt
.. autosummary::
~RBmodel.Gaussian_source
~RBmodel.add_Lmax
~RBmodel.add_Lpp
~RBmodel.add_PSD_obs
~RBmodel.add_PSD_twin
~RBmodel.add_omni
~RBmodel.add_source
~RBmodel.assimilate
~RBmodel.evolve
~RBmodel.get_DLL
~RBmodel.plot
~RBmodel.plot_obs
~RBmodel.set_lgrid
~RBmodel.setup_ticks
.. automethod:: Gaussian_source
.. automethod:: add_Lmax
.. automethod:: add_Lpp
.. automethod:: add_PSD_obs
.. automethod:: add_PSD_twin
.. automethod:: add_omni
.. automethod:: add_source
.. automethod:: assimilate
.. automethod:: evolve
.. automethod:: get_DLL
.. automethod:: plot
.. automethod:: plot_obs
.. automethod:: set_lgrid
.. automethod:: setup_ticks
"""
def __init__(self, grid='L', NL=91, const_kp=False):
"""
format for grid e.g., L-PA-E
"""
import spacepy.lib
self.const_kp=const_kp
# Initialize the code to use to advance the solutionp.
if spacepy.lib.have_libspacepy and spacepy.lib.solve_cn:
self.advance=spacepy.lib.solve_cn
else:
self.advance=get_modelop_L
grid = grid.upper()
for type in grid.split():
if type == 'L':
self.MU = 2083
self.K = 0.03
self.DLL_model = 'BA2000'
self.Lmax_model = 'JKemp'
self.Lpp_model = 'CA1992'
#self.SRC_model = 'JK1'
#self.SRCmagn = datetime.timedelta(days=1e-1) # relative acceleration per day
self.MPloss = datetime.timedelta(minutes=0.1) # minutes time scale
self.PPloss = datetime.timedelta(days=10.) # days time scale
self.set_lgrid(NL)
self.MODerr = 5. # relative factor * PSD
#self.PSDerr = 0.3 # relative observational error
self.PSDerr = 0.25 # relative observational error
self.MIN_PSD = 1e-99
# source term flag set to false
self.source = False
# -----------------------------------------------
def __str__(self):
return '<RB Model; mu=%f, k=%f, DLL_model=%s >' % \
(self.MU, self.K, self.DLL_model)
__repr__ = __str__
# -----------------------------------------------
def __getitem__(self, idx):
"""
"""
return self.PSD[:,idx]
# -----------------------------------------------
[docs] def set_lgrid(self, NL=91):
'''
Using NL grid points, create grid in L.
Default number of points is 91 (dL=0.1).
'''
from numpy import linspace, zeros
self.NL = NL
self.Lgrid = linspace(1,10,self.NL)
self.PSDinit = zeros(self.NL, dtype=ctypes.c_double)
# -----------------------------------------------
[docs] def setup_ticks(self, start, end, delta, dtype='ISO'):
"""
Add time information to the simulation by specifying
a start and end time, timestep, and time type (optional).
Examples
========
>>> start = datetime.datetime(2003,10,14)
>>> end = datetime.datetime(2003,12,26)
>>> delta = datetime.timedelta(hours=1)
>>> rmod.setup_ticks(start, end, delta, dtype='UTC')
"""
self.ticks = st.tickrange(start, end, delta, dtype)
# -----------------------------------------------
[docs] def add_omni(self, keylist=None):
"""
add omni data to instance according to the tickrange in ticks
"""
import spacepy.omni as om
assert 'ticks' in self.__dict__ , \
"Provide tick range with 'setup_ticks'"
omni = om.get_omni(self.ticks)
if 'params' not in self.__dict__:
self.params = {}
if keylist is None:
# add all keys
self.params = omni
else:
# just add requested keys
for key in keylist:
self.params[key] = omni[key]
# -----------------------------------------------
[docs] def add_Lmax(self, Lmax_model):
"""
add last closed drift shell Lmax
"""
import spacepy.empiricals as em
if 'params' not in self.__dict__:
self.params = {}
assert self.ticks, "Provide tick range with 'setup_ticks'"
self.params['Lmax'] = em.get_Lmax(self.ticks, Lmax_model)
# -----------------------------------------------
[docs] def add_Lpp(self, Lpp_model):
"""
add last closed drift shell Lmax
"""
import spacepy.empiricals as em
if 'params' not in self.__dict__:
self.params = {}
assert self.ticks, "Provide tick range with 'setup_ticks'"
self.params['Lpp'] = em.get_plasma_pause(self.ticks, Lpp_model)
# -----------------------------------------------
[docs] def add_PSD_obs(self, time=None, PSD=None, Lstar=None, satlist=None):
"""
add PSD observations
Parameters
----------
time : Ticktock datetime array
array of observation times
PSD : list of numpy arrays
PSD observational data for each time. Each entry in the list is a
numpy array with the observations for the corresponding time
Lstar : list of numpy arrays
Lstar location of each PSD observations. Each entry in the list is
a numpy array with the location of the observations for the
corresponding time
satlist : list of satellite names
Returns
-------
out : list of dicts
Information of the observational data, where each entry
contains the observations and locations of observations for each
time specified in the time array. Each list entry is a dictionary
with the following information:
Ticks : Ticktock array
time of observations
Lstar : numpy array
location of observations
PSD : numpy array
PSD observation values
sat : list of strings
satellite names
MU : scalar value
Mu value for the observations
K : scalar value
K value for the observations
"""
import pdb
assert 'ticks' in self.__dict__ , \
"Provide tick range with 'setup_ticks'"
Tgrid = self.ticks
nTAI = len(Tgrid)
# initialize PSDdata list
self.PSDdata = ['']*(nTAI-1)
if (PSD == None):
# PSD data not provided,
# extract from database
import spacepy.sandbox.PSDdata as PD
for i, Tnow, Tfut in zip(np.arange(nTAI-1), Tgrid[:-1], Tgrid[1:]):
start_end = spacepy.time.Ticktock([Tnow.UTC[0], Tfut.UTC[0]], 'UTC')
self.PSDdata[i] = PD.get_PSD(start_end, self.MU, self.K, satlist)
else:
# PSD data arrays provided
# model grid
Lgrid = self.Lgrid
itime = 0
# loop over time defined for the model integration
for i, Tnow, Tfut in zip(np.arange(nTAI-1), Tgrid[:-1], Tgrid[1:]):
# empty array
lstar = np.array([],dtype=float)
time_idx = np.array([],dtype=int)
# loop over observation time
for itime in np.arange(len(time)):
if (Tnow <= time[itime] and time[itime] <= Tfut):
#print 'match!! '
#print itime
#print i
#print time[itime]
#print Tnow
#print Tfut
# concatenate to lstar
lstar = np.concatenate((lstar,Lstar[itime]))
lstar = np.unique(lstar)
#idx = lstar.argsort()
#pdb.set_trace()
# add time index
time_idx = np.append(time_idx,itime)
#end loop
#pdb.set_trace()
if (time_idx.shape[0] > 0):
# initialize PSD array
psd = np.zeros_like(lstar)
# initialize number of obs array
num_obs = np.zeros_like(lstar)
# sort time index
time_idx = np.unique(time_idx)
# loop over time index
for itime in time_idx:
# sort observations
idx = Lstar[itime].argsort()
tmplstar = Lstar[itime][idx]
tmppsd = PSD[itime][idx]
# run through all unique grid-points and compute
# average observation
for j, iL in enumerate(lstar):
# identify idex of grid-point
idx = np.where(np.isclose(iL,tmplstar))
# assign observation for grid-point
psd[j] = psd[j] + tmppsd[idx]
# add for number of observations
num_obs[j] = num_obs[j] + 1.0
psd = psd/num_obs
# assign time for observations
#Ticks = time[itime]
Ticks = Tfut
# determine position of observations
#lstar = Lstar[itime]
# determine observations PSD
#psd = PSD[itime]
# provide MU
MU = self.MU*np.ones_like(lstar)
# provide K
K = self.K*np.ones_like(lstar)
# empy satellite
sat = ['']
# add to dictionary
self.PSDdata[i] = {'Ticks':Ticks, 'Lstar':lstar, \
'PSD':psd, 'sat':sat, \
'MU':MU, 'K':K}
# adjust initial conditions to these PSD values
mval = np.mean(self.PSDdata[0]['PSD'])
self.PSDinit = mval*np.exp(-(self.Lgrid - 5.5)**2/0.8)
return
# -----------------------------------------------
### def add_PSD(self, satlist=None):
###
### """
### add observations from PSD database using the ticks list
### """
###
### import spacepy.sandbox.PSDdata as PD
### import spacepy.time
###
### assert 'ticks' in self.__dict__ , \
### "Provide tick range with 'setup_ticks'"
### Tgrid = self.ticks
### nTAI = len(Tgrid)
###
### self.PSDdata = ['']*(nTAI-1)
### for i, Tnow, Tfut in zip(np.arange(nTAI-1), Tgrid[:-1], Tgrid[1:]):
### start_end = spacepy.time.Ticktock([Tnow.UTC[0], Tfut.UTC[0]], 'UTC')
### self.PSDdata[i] = PD.get_PSD(start_end, self.MU, self.K, satlist)
###
### # adjust initial conditions to these PSD values
### mval = np.mean(self.PSDdata[0]['PSD'])
### #self.PSDinit = mval*np.exp(-(self.Lgrid - 5.5)**2/0.2)
### self.PSDinit = mval*np.exp(-(self.Lgrid - 5.5)**2/0.8)
###
### return
### # -----------------------------------------------
[docs] def add_PSD_twin(self,dt=0,Lt=1):
"""
add observations from PSD database using the ticks list
the arguments are the following:
dt = observation time delta in seconds
Lt = observation space delta
"""
assert 'ticks' in self.__dict__ , \
"Provide tick range with 'setup_ticks'"
Tgrid = self.ticks
nTAI = len(Tgrid)
# compute time delta
delta = Tgrid[1].UTC[0]-Tgrid[0].UTC[0]
delta = delta.seconds
# compute observations time delta
#dt = 2*60*60
self.PSDdata = ['']*(nTAI-1)
for i, Tnow, Tfut in zip(np.arange(nTAI-1), Tgrid[:-1], Tgrid[1:]):
# get observations every dt
if (np.mod(i*delta,dt) == 0):
# assign time for observations
Ticks = Tnow
# determine position of observations
lstar = self.Lgrid[0:len(self.Lgrid):Lt]
# determine observations PSD
psd = self.PSD[0:len(self.Lgrid):Lt,i]
# provide MU
MU = self.MU*np.ones_like(lstar)
# provide K
K = self.K*np.ones_like(lstar)
# empy satellite
sat = np.zeros_like(lstar)
# add to dictionary
self.PSDdata[i] = {'Ticks':Ticks, 'Lstar':lstar, \
'PSD':psd, 'sat':sat, \
'MU':MU, 'K':K}
else:
self.PSDdata[i] = {}
# adjust initial conditions to these PSD values
mval = np.mean(self.PSDdata[0]['PSD'])
self.PSDinit = mval*np.exp(-(self.Lgrid - 5.5)**2/0.2)
return
# -----------------------------------------------
[docs] def add_source(self,source=True,A=1.0e-8,mu=5.0,sigma=0.5):
"""
add source parameters A, mu, and sigma for the Gaussian source function
"""
# set source term flag
self.source = source
# define height of Gaussian function
self.source_A = A
# define center of Gaussian function
self.source_mu = mu
# define amplitude
self.source_sigma = sigma
return
# -----------------------------------------------
[docs] def Gaussian_source(self):
"""
Gaussian source term added to radiation belt model. The source term is
given by the equation:
S = A exp{-(L-mu)^2/(2*sigma^2)}
with A=10^(-8), mu=5.0, and sigma=0.5 as default values
"""
Lgrid = self.Lgrid
# determine whether to include source or not
if self.source:
# define height of Gaussian function
A = self.source_A
# define center of Gaussian function
mu = self.source_mu
# define amplitude
sigma = self.source_sigma
else:
A = 0.0
mu = 1.0
sigma = 1.0
# Gaussian function
S = A*np.exp(-(Lgrid-mu)**2/(2.0*sigma**2))
return S
# -----------------------------------------------
[docs] def evolve(self):
"""
calculate the diffusion in L at constant mu,K coordinates
"""
from . import radbelt as rb
assert 'ticks' in self.__dict__ , \
"Provide tick range with 'setup_ticks'"
f = self.PSDinit
Tgrid = self.ticks.TAI
nTAI = len(Tgrid)
Lgrid = self.Lgrid
self.PSD = np.zeros( (len(f),len(Tgrid)), dtype=ctypes.c_double)
self.PSD[:,0] = f.copy()
# add omni if not already given
if 'omni' not in self.__dict__:
self.add_omni(keylist=['Kp', 'Dst'])
# run Lmax model
self.add_Lmax(self.Lmax_model)
# run plasma pause model
self.add_Lpp(self.Lpp_model)
# setup params dictionary
params = {}
params['DLL_model'] = self.DLL_model
#params['SRC_model'] = self.SRC_model
#params['SRCmagn'] = self.SRCmagn
params['MPloss'] = self.MPloss
params['PPloss'] = self.PPloss
if 'SRCartif' in self.__dict__:
params['SRCartif'] = self.SRCartif
keylist = ['Kp', 'Dst', 'Lmax', 'Lpp']
# start with the first one since 0 is the initial condition
for i, Tnow, Tfut in zip(np.arange(nTAI-1)+1, Tgrid[:-1], Tgrid[1:]):
Tdelta = Tfut - Tnow
Telapse= Tnow - Tgrid[0]
# copy over parameter list
for key in keylist:
params[key] = self.params[key][i]
# now integrate from Tnow to Tfut
f = diff_LL(self, Lgrid, f, Tdelta, Telapse, params=params)
# add Gaussian source term
f = f + self.Gaussian_source()
# copy to PSD
self.PSD[:,i] = f.copy()
# -----------------------------------------------
[docs] def assimilate(self, method='EnKF',inflation=0):
"""
Assimilates data for the radiation belt model using the Ensemble
Kalman Filter. The algorithm used is the SVD method presented by
Evensen in 2003 (Evensen, G., Ocean dynamics, 53, pp.343--367, 2003).
To compensate for model errors, three inflation algorithms are
implemented. The inflation methodology is specified by the
'inflation' argument, and the options are the following:
inflation == 0: Add model error (perturbation for the ensemble)
around model state values only where observations are available
(DEFAULT).
inflation == 1: Add model error (perturbation for the ensemble)
around observation values only where observations are available.
inflation == 2: Inflate around ensemble average for EnKF.
Prior to assimilation, a set of data values has to be speficied by
setting the start and end dates, and time step, using the setup_ticks
funcion of the radiation belt model:
>>> import spacepy
>>> import datetime
>>> from spacepy import radbelt
>>> start = datetime.datetime(2002,10,23)
>>> end = datetime.datetime(2002,11,4)
>>> delta = datetime.timedelta(hours=0.5)
>>> rmod.setup_ticks(start, end, delta, dtype='UTC')
Once the dates and time step are specified, the data is added using the
add_PSD function:
>>> rmod.add_PSD()
The observations are averaged over the time windows, whose interval is
give by the time step.
Once the dates and data are set, the assimiation is performed using the
'assimilate' function:
>>> rmod.assimilate(inflation=1)
This function will add the PSDa values, which are the analysis state of
the radiation belt using the observations within the dates. To plot the
analysis simply use the plot funtion:
>>> rmod.plot(values=rmod.PSDa,clims=[-10,-6],Lmax=False,Kp=False,Dst=False)
"""
import spacepy.data_assimilation
import spacepy.sandbox.PSDdata as PD
import copy as c
# add PSD observations with add_PSD,
# this has to be done to the class
# module when running the RBmodel.
# add_PSD will add the PSDdata to the class
# which is a dictionary for the data that has been added
# debugging command
#pdb.set_trace()
# setup method
assert method in ['EnKF','insert'], 'data assimilation method='+method+' not implemented'
nTAI = len(self.ticks)
# enKF method
if method == 'EnKF':
da = spacepy.data_assimilation.ensemble()
# initialize A with initial condition
# the initial condition is ones, have to change this to initialize
# the ensemble with perturbed reference state
A = np.ones( (self.NL, da.Nens) )*self.PSDinit[:,np.newaxis]
self.PSDf = np.zeros( (self.PSDinit.shape[0], nTAI) )
self.PSDa = np.zeros( (self.PSDinit.shape[0], nTAI) )
self.PSDa[:,0] = self.PSDinit
self.PSDf[:,0] = self.PSDinit
# diagnostic tools:
# observations-minus-background
self.PSD_omb = ['']*(nTAI)
# observations-minus-analysis
self.PSD_oma = ['']*(nTAI)
# analysis-minus-background
self.PSD_amb = np.zeros( (self.PSDinit.shape[0], nTAI) )
# add model error (perturbation) in the ensemble initial conditionp.
#A = da.add_model_error(self, A, self.PSDdata[0])
std = 0.35
normal = np.random.randn( da.Nens )
for iens in np.arange(da.Nens):
A[:,iens] = A[:,iens] + std*normal[iens]*A[:,iens]
A[np.where(A < self.MIN_PSD)] = self.MIN_PSD
# ==========================================
# DEBUG
#np.savetxt('ensemble_IC.dat',A)
#np.savetxt('model_IC.dat',self.PSDinit)
#np.savetxt('model_grid_IC.dat',self.Lgrid)
# ==========================================
# create temporary RB class instance
rbtemp = c.copy(self)
# time loop
for i, Tnow, Tfut in zip(np.arange(nTAI-1)+1, self.ticks[:-1], self.ticks[1:]):
# make forcast and add model error
# make forecast using all ensembles in A
iens = 0
for f in A.T:
rbtemp.ticks = st.Ticktock([Tnow.UTC[0], Tfut.UTC[0]], 'UTC')
rbtemp.PSDinit = f.copy()
rbtemp.evolve()
#rbtemp.PSDdata = self.PSDdata[i-1]
A[:,iens] = rbtemp.PSD[:,1].copy()
iens += 1
# save result in ff
Tnow = Tfut
self.PSDf[:,i] = np.mean(A, axis=1)
# verify that there are data points within the interval, if
# there are data points then extract average observations
# within the window, if not return empty observation array y
# and Lobs.
if len(self.PSDdata[i-1]) > 0:
# get observations for time window ]Tnow-Twindow,Tnow]
Lobs, y = spacepy.data_assimilation.average_window(self.PSDdata[i-1], self.Lgrid)
else:
y = np.array([])
Lobs = np.array([])
print(Lobs)
print(y)
# ==========================================
# DEBUG
#np.savetxt('obs_location_IC.dat',Lobs)
#np.savetxt('obs_IC.dat',y)
#pdb.set_trace()
# ==========================================
# then assimilate otherwise do another forcast
if len(y) > 0:
### check for minimum PSD values
### A[np.where(A<self.MIN_PSD)] = self.MIN_PSD
### # insert observations directly
### A = da.add_model_error_obs(self, A, Lobs, y)
### # dictionary
### self.PSDa[:,i] = np.mean(A, axis=1)
# INFLATION SCHEMES
if inflation == 0:
print('inflation around model state values at observation locations')
# Add model error (perturbation for the ensemble) around model
# state values. This acts as an inflation scheme for EnKF
A = da.add_model_error(self, A, self.PSDdata[i-1])
elif inflation == 1:
print('inflation around observation values at observation locations')
# Add model error (perturbation for the ensemble) around
# observation values. This acts as an inflation scheme for EnKF
A = da.add_model_error_obs(self, A, Lobs, y)
elif inflation == 2:
print('inflation around ensemble average')
# Inflate around ensemble average for EnKF
# ensemble average
ens_avg = np.mean(A, axis=1)
# inflation factor
inflation_factor = 1.8
# loop over ensemble members and inflate
iens = 0
for ens in A.T:
# inflate ensemble
A[:,iens] = inflation_factor*(ens - ens_avg) + ens_avg
iens += 1
A[np.where(A < self.MIN_PSD)] = self.MIN_PSD
# prepare assimilation analysis
# project ensemble states to obs. grid
HA = da.getHA(self, Lobs, A)
# measurement perturbations ensemble
Psi = da.getperturb(self, y)
# ensemble of innovation vectors
Inn = da.getInnovation(y, Psi, HA)
# calculate ensemble perturbation HA' = HA-HA_mean
HAp = da.getHAprime(HA)
# calculate prior diagnostics
# observation minus background
omb = y-np.average(HA,axis=1)
self.PSD_omb[i] = {
'Lobs':Lobs,
'y':y,
'omb':omb,
}
xf_avg = np.average(A,axis=1)
# now call the main analysis routine
if len(y) == 1:
A = da.EnKF_oneobs(A, Psi, Inn, HAp)
else:
A = da.EnKF(A, Psi, Inn, HAp)
# check for minimum PSD values
A[np.where(A<self.MIN_PSD)] = self.MIN_PSD
# average A from analysis step and save in results
# dictionary
self.PSDa[:,i] = np.mean(A, axis=1)
# calculate posterior diagnostics
# observation minus analysis
Hanalysis = da.getHA(self, Lobs, A)
oma = y-np.average(Hanalysis,axis=1)
self.PSD_oma[i] = {
'Lobs':Lobs,
'y':y,
'omb':oma,
}
# analysis minus background
xa_avg = np.average(A,axis=1)
self.PSD_amb[:,i] = xa_avg - xf_avg
#pdb.set_trace()
# print assimilated result
Hx = np.zeros_like(y)
for iL,Lstar in enumerate(Lobs):
idx = np.where(np.isclose(self.Lgrid,Lstar))
Hx[iL] = self.PSDa[idx,i]
print(Hx)
print(HA)
elif len(y) == 0:
print('no observations within this window')
self.PSDa[:,i] = self.PSDf[:,i]
continue #
# print message
print('Tnow: ', self.ticks[i].ISO)
# insert obsrvations for data assimilation
elif method == 'insert':
da = spacepy.data_assimilation.ensemble()
A = np.ones( (self.NL, 1) )*self.PSDinit[:,np.newaxis]
self.PSDf = np.zeros( (self.PSDinit.shape[0], nTAI) )
self.PSDa = np.zeros( (self.PSDinit.shape[0], nTAI) )
self.PSDa[:,0] = self.PSDinit
self.PSDf[:,0] = self.PSDinit
# add model error (perturbation) in the ensemble initial condition.
std = 0.15
normal = np.random.randn( self.NL )
A[:,0] = (1.0 + std*normal)*A[:,0]
A[np.where(A < self.MIN_PSD)] = self.MIN_PSD
rbtemp = c.copy(self)
# time loop
for i, Tnow, Tfut in zip(np.arange(nTAI-1)+1, self.ticks[:-1], self.ticks[1:]):
# evolve solution
rbtemp.ticks = st.Ticktock([Tnow.UTC[0], Tfut.UTC[0]], 'UTC')
rbtemp.PSDinit = A[:,0]
rbtemp.evolve()
A[:,0] = rbtemp.PSD[:,1]
# save result in ff
Tnow = Tfut
self.PSDf[:,i] = A[:,0]
# verify that there are data points within the interval, if
# there are data points then extract average observations
# within the window, if not return empty observation array y
# and Lobs.
if len(self.PSDdata[i-1]) > 0:
Lobs, y = spacepy.data_assimilation.average_window(self.PSDdata[i-1], self.Lgrid)
else:
y = np.array([])
Lobs = np.array([])
print(Lobs)
print(y)
#pdb.set_trace()
# then assimilate otherwise do another forcast
if len(y) > 0:
# check for minimum PSD values
A[np.where(A<self.MIN_PSD)] = self.MIN_PSD
# create index of obs location
idx = np.array([],dtype=int)
for Lval in Lobs:
Lidx = np.where( fp_equality.eq(Lval,self.Lgrid) )[0]
idx = np.append(idx,np.array([int(Lidx)]))
# insert observations directly
A[idx,0] = y
#A = da.add_model_error_obs(self, A, Lobs, y)
# dictionary
self.PSDa[:,i] = A[:,0]
elif len(y) == 0:
print('no observations within this window')
self.PSDa[:,i] = self.PSDf[:,i]
continue #
# print message
print('Tnow: ', self.ticks[i].ISO)
# -----------------------------------------------
[docs] def plot(self, Lmax=True, Lpp=False, Kp=True, Dst=True,
clims=[0,10], title=None, values=None):
"""
Create a summary plot of the RadBelt object distribution function.
For reference, the last closed drift shell, Dst, and Kp are all
included. These can be disabled individually using the corresponding
Boolean kwargs.
The clims kwarg can be used to manually set the color bar range.
To use, set it equal to a two-element list containing minimum and
maximum Log_10 value to plot. Default action is to use [0,10] as
the log_10 of the color range. This is good enough for most
applications.
The title of the top most plot defaults to 'Summary Plot' but can be
customized using the title kwarg.
The figure object and all three axis objects (PSD axis, Dst axis,
and Kp axis) are all returned to allow the user to further customize
the plots as necessary. If any of the plots are excluded, None is
returned in their stead.
Examples
========
>>> rb.plot(Lmax=False, Kp=False, clims=[2,10], title='Good work!')
This command would create the summary plot with a color bar range
of 100 to 10^10. The Lmax line and Kp values would be excluded.
The title of the topmost plot (phase space density) would be set to
'Good work!'.
"""
import matplotlib.pyplot as p
from matplotlib.colors import LogNorm
from matplotlib.ticker import (LogLocator, LogFormatter,
LogFormatterMathtext)
import pdb
# debugging command
#pdb.set_trace()
# test for default values
if values is None:
values = self.PSD
# Initialize axis variables so that they can be returned,
# even if not used.
ax1 = None
ax2 = None
ax3 = None
fig = p.figure()
fig.subplots_adjust(left=0.10, right=0.999, top=0.92)
# PLOT PSD
if Kp or Dst:
ax1 = p.subplot(2,1,1)
else:
ax1 = p.subplot(1,1,1)
# Plot phase space density, masking out values of 0.
map = ax1.pcolorfast(self.ticks.eDOY, self.Lgrid,
np.where(values > 0.0, self.PSD, 10.0**-39),
vmin=10.0**clims[0], vmax=10.0**clims[1],
norm=LogNorm())
ax1.set_ylabel('L*')
if title is not None:
ax1.set_title(title)
# Add color bar.
cbar = p.colorbar(map, pad=0.01, shrink=.85, ticks=LogLocator(),
format=LogFormatterMathtext())
cbar.set_label('Phase Space Density')
# add Lmax line
if Lmax:
p.plot(self.ticks.eDOY, self.params['Lmax'], 'w')
# Minimize time range.
ax1.set_xlim([self.ticks.eDOY[0], self.ticks.eDOY[-1]])
# Finally, save the position of the plot to make next plot match.
pos = ax1.get_position()
if Dst is True:
ax2 = p.subplot(2,1,2)
pos2 = ax2.get_position()
pos2.x1 = pos.x1
ax2.set_position(pos2)
ax2.plot(self.ticks.eDOY, self.params['Dst'], color='r')
ax2.set_xlabel('DOY in '+str(self.ticks.UTC[0].year))
ax2.set_ylabel('Dst', color='r')
for tl in ax2.get_yticklabels():
tl.set_color('r')
ax2.set_xlim([self.ticks.eDOY[0], self.ticks.eDOY[-1]])
if Kp is True:
if Dst is True:
p.subplot(2,1,2)
ax3 = p.twinx()
else:
ax3 = p.subplot(2,1,2)
pos3 = ax3.get_position()
pos3.x1 = pos.x1
ax3.set_position(pos3)
ax3.plot(self.ticks.eDOY, self.params['Kp'], 'k:')
if Dst is True:
ax3.yaxis.tick_right()
ax3.set_xlabel('DOY in '+str(self.ticks.UTC[0].year))
ax3.set_ylabel('Kp')
ax3.set_xlim([self.ticks.eDOY[0], self.ticks.eDOY[-1]])
#p.show()
return fig, ax1, ax2, ax3
# -----------------------------------------------
[docs] def plot_obs(self, Lmax=True, Lpp=False, Kp=True, Dst=True,
clims=[0,10], title=None, values=None):
"""
Create a summary plot of the observations. For reference, the last
closed drift shell, Dst, and Kp are all included. These can be
disabled individually using the corresponding boolean kwargs.
The clims kwarg can be used to manually set the color bar range.
To use, set it equal to a two-element list containing minimum and
maximum Log_10 value to plot. Default action is to use [0,10] as
the log_10 of the color range. This is good enough for most
applications.
The title of the top most plot defaults to 'Summary Plot' but can be
customized using the title kwarg.
The figure object and all three axis objects (PSD axis, Dst axis,
and Kp axis) are all returned to allow the user to further customize
the plots as necessary. If any of the plots are excluded, None is
returned in their stead.
Examples
========
>>> rb.plot_obs(Lmax=False, Kp=False, clims=[2,10], title='Observations Plot')
This command would create the summary plot with a color bar range
of 100 to 10^10. The Lmax line and Kp values would be excluded.
The title of the topmost plot (phase space density) would be set to
'Good work!'.
"""
import spacepy.data_assimilation
import matplotlib.pyplot as p
from matplotlib.colors import LogNorm
from matplotlib.ticker import (LogLocator, LogFormatter,
LogFormatterMathtext)
import pdb
# debugging command
#pdb.set_trace()
# test for default values
if values is None:
values = self.PSDdata
# Initialize axis variables so that they can be returned,
# even if not used.
ax1 = None
ax2 = None
ax3 = None
fig = p.figure()
fig.subplots_adjust(left=0.10, right=0.999, top=0.92)
# compute time-window average observation value
y = np.array([],dtype=float)
Lobs = np.array([],dtype=float)
eDOYobs = np.array([],dtype=float)
nTAI = len(self.ticks)
# time loop
for i, Tnow, Tfut in zip(np.arange(nTAI-1)+1, self.ticks[:-1], self.ticks[1:]):
if len(values[i-1]) > 0:
# get observations for time window ]Tnow-Twindow,Tnow]
Lobs_tmp, y_tmp = spacepy.data_assimilation.average_window(values[i-1], self.Lgrid)
y = np.append(y,y_tmp)
Lobs = np.append(Lobs,Lobs_tmp)
eDOYobs = np.append(eDOYobs,np.ones(len(y_tmp))*self.ticks.eDOY[i-1])
# PLOT PSDdata
if Kp or Dst:
ax1 = p.subplot(2,1,1)
else:
ax1 = p.subplot(1,1,1)
# Plot phase space density observations.
map = ax1.scatter(eDOYobs,Lobs,c=y,norm=LogNorm(),
vmin=10.0**clims[0], vmax=10.0**clims[1],
edgecolor='none')
ax1.set_ylabel('L*')
if title is not None:
ax1.set_title(title)
ax1.set_ylim(self.Lgrid[0],self.Lgrid[len(self.Lgrid)-1])
# Add color bar.
cbar = p.colorbar(map, pad=0.01, shrink=.85, ticks=LogLocator(),
format=LogFormatterMathtext())
cbar.set_label('Phase Space Density')
# add Lmax line
if Lmax:
p.plot(self.ticks.eDOY, self.params['Lmax'], 'w')
# Minimize time range.
ax1.set_xlim([self.ticks.eDOY[0], self.ticks.eDOY[-1]])
# Finally, save the position of the plot to make next plot match.
pos = ax1.get_position()
if Dst is True:
ax2 = p.subplot(2,1,2)
pos2 = ax2.get_position()
pos2.x1 = pos.x1
ax2.set_position(pos2)
ax2.plot(self.ticks.eDOY, self.params['Dst'], color='r')
ax2.set_xlabel('DOY in '+str(self.ticks.UTC[0].year))
ax2.set_ylabel('Dst', color='r')
for tl in ax2.get_yticklabels():
tl.set_color('r')
ax2.set_xlim([self.ticks.eDOY[0], self.ticks.eDOY[-1]])
if Kp is True:
if Dst is True:
p.subplot(2,1,2)
ax3 = p.twinx()
else:
ax3 = p.subplot(2,1,2)
pos3 = ax3.get_position()
pos3.x1 = pos.x1
ax3.set_position(pos3)
ax3.plot(self.ticks.eDOY, self.params['Kp'], 'k:')
if Dst is True:
ax3.yaxis.tick_right()
ax3.set_xlabel('DOY in '+str(self.ticks.UTC[0].year))
ax3.set_ylabel('Kp')
ax3.set_xlim([self.ticks.eDOY[0], self.ticks.eDOY[-1]])
#p.show()
return fig, ax1, ax2, ax3
[docs] def get_DLL(self, Lgrid, params, DLL_model='BA2000'):
"""
Calculate DLL as a simple power law function (alpha*L**Bbta)
using alpha/beta values from popular models found in the
literature and chosen with the kwarg "DLL_model".
The calculated DLL is returned, as is the alpha and beta
values used in the calculationp.
The output DLL is in units of units/day.
"""
if DLL_model == 'BA2000': # Brautigam and Albert (2000)
if type(self.const_kp) == type(0.0):
Kp=self.const_kp
else:
Kp = params['Kp']
alpha = 10.0**(0.506*Kp-9.325)
beta = 10.0
elif DLL_model == 'FC2006': # Fei and Chan (2006)
alpha = 1.5e-6
beta = 8.5
elif DLL_model == 'U2008': # Ukhorskiy (2008)
alpha = 7.7e-6
beta = 6.0
elif DLL_model == 'S1997': # Selesnick (1997)
alpha = 1.9e-10
beta = 11.7
elif DLL_model == 'const': # Constant DLL.
alpha= 1.0
beta = 1.0
DLL = np.zeros(len(Lgrid), dtype=ctypes.c_double)+10.
# approximately BA2000 for Kp=1, L=1.
else:
raise ValueError("Radial diffusion model %s not implemented" % DLL_model)
if (DLL_model!='const'): DLL = alpha * Lgrid ** beta
return DLL, alpha, beta
# -----------------------------------------------
[docs]def get_modelop_L(f, L, Dm_old, Dm_new, Dp_old, Dp_new, Tdelta, NL):
"""
Advance the distribution function, f, discretized into the Lgrid, L, forward
in time by a timestep, Tdelta. The off-grid current and next diffusion
coefficients, D[m,p]_[old,new] will be used. The number of grid points is set
by NL.
This function performs the same calculation as the C-based code,
spacepy.lib.solve_cnp. This code is very slow and should only be used when
the C code fails to compile.
"""
import numpy.linalg as nlin
# get grid and setup centered grid Lm=L_i-1/2, Lp=L_i+1/2
Lmin = L[0]
Lmax = L[-1]
dL = L[1] - L[0]
Lp = L + 0.5*dL
Lm = L - 0.5*dL
# setup the diffusion coefficient on each grid and center grid
#Dllm = np.zeros(NL); Dllp = np.zeros(NL)
betam = np.zeros(NL); betap = np.zeros(NL)
for i in range(1,int(NL)-1,1):
Dllp[i] = 0.5*(DLL[i]+DLL[i+1])
Dllm[i] = 0.5*(DLL[i]+DLL[i-1])
betam[i] = Dllm[i]*dt / (2*dL*dL*Lm[i]*Lm[i])
betap[i] = Dllp[i]*dt / (2*dL*dL*Lp[i]*Lp[i])
# initialize some arrays
A = np.zeros( (NL,NL) )
B = np.zeros( (NL,NL) )
C = np.zeros( (NL,NL) )
# off diagonal elements
for i in np.arange(1,int(NL)-1,1):
# diagonal elements
A[i,i] = 1 + betam[i]*L[i]*L[i] + betap[i]*L[i]*L[i]
B[i,i] = 1 - betam[i]*L[i]*L[i] - betap[i]*L[i]*L[i]
# off diagonal elements
A[i,i+1] = -betap[i]*L[i]*L[i]
A[i,i-1] = -betam[i]*L[i]*L[i]
B[i,i+1] = betap[i]*L[i]*L[i]
B[i,i-1] = betam[i]*L[i]*L[i]
# boundary condition
A[0,0] = 1.
A[-1,-1] = 1.
B[0,0] = 1.
B[-1,-1] = 1.
# get inverse and multipy
Ai = nlinp.inv(A)
C = np.dot(Ai,B)
return np.dot(C, f)
# -----------------------------------------------
[docs]def diff_LL(r, grid, f, Tdelta, Telapsed, params=None):
"""
calculate the diffusion in L at constant mu,K coordinates
time units
"""
import ctypes as ct
# prepare some parameter variables
if params:
DLL_model = params['DLL_model']
Kp = params['Kp']
Dst = params['Dst']
Lmax = params['Lmax']
Lpp = params['Lpp']
#else: # use standard config
# but which one?
Lgrid = grid
NL = len(Lgrid)
dL = Lgrid[1]-Lgrid[0]
# get DLL and model operator
(DLL, alpha, beta) = r.get_DLL(Lgrid, params, DLL_model)
(DLLp, alpha, beta) = r.get_DLL(Lgrid + dL/2.0, params, DLL_model)
(DLLm, alpha, beta) = r.get_DLL(Lgrid - dL/2.0, params, DLL_model)
DLL = DLL/86400.; DLLp = DLLp/86400.; DLLm = DLLm/86400.
# Set default of NO sources:
src = np.zeros(NL)
# Create source operators (source splitting) using implicit
# trapezoidal method to solve source ODE.
# Add artificial sources
if 'SRCartif' in params:
# Call the artificial source function, sending info as
# key word arguments. Note that such functions should be
# able to handle extra kwargs through the use of **kwargs!
sfunc = params['SRCartif']
# Apply using correct CN-method.
src=0.5*Tdelta*(
sfunc(Telapsed, Lgrid, alpha, DLL, beta) +
sfunc(Telapsed+Tdelta, Lgrid, alpha, DLL, beta))
src[0], src[-1] = 0.,0.
else:
src1 = np.zeros(NL)
src2 = np.zeros(NL)
# Apply solution operators to f.
dptr = ctypes.POINTER(ctypes.c_double)
r.advance(f.ctypes.data_as(dptr),
Lgrid.ctypes.data_as(dptr),
DLLm.ctypes.data_as(dptr),
DLLm.ctypes.data_as(dptr),
DLLp.ctypes.data_as(dptr),
DLLp.ctypes.data_as(dptr),
ct.c_double(Tdelta), NL,
src.ctypes.data_as(dptr))
# add source according to values in SRC...
# if params['SRC_model']:
# # setup source vector
# S = get_local_accel(Lgrid, params, SRC_model='JK1')
# f = f + S*Tdelta
# add losses through magnetopause shadowing, time scale taken from MPloss
mp_sec = params['MPloss'].days * 86400.0 + params['MPloss'].seconds + \
params['MPloss'].microseconds / 1000000.0
if mp_sec > 0.0:
# setup loss vector
LSS = np.zeros(NL)
LSS[np.where(Lgrid>params['Lmax'])] = -1./mp_sec
f = f*np.exp(LSS*Tdelta)
# add losses inside plasma pause, time scale taken from PPloss
pp_sec = params['PPloss'].days * 86400.0 + params['PPloss'].seconds + \
params['PPloss'].microseconds / 1000000.0
if pp_sec > 0.0:
# calculate plasma pause location after Carpenter & Anderson 1992
LSS = np.zeros(NL)
LSS[np.where(Lgrid<params['Lpp'])] = -1./pp_sec
f = f*np.exp(LSS*Tdelta)
return f
# -----------------------------------------------
[docs]def get_local_accel(Lgrid, params, SRC_model='JK1'):
"""
calculate the diffusion coefficient D_LL
"""
if SRC_model == 'JK1':
magn = params['SRCmagn'].seconds
Lcenter = 5.6
Lwidth = 0.3
Kp = params['Kp']
S = magn*np.exp(-(Lgrid-Lcenter)**2/(2*Lwidth**2))*Kp*Kp
return S