# Source code for spacepy.data_assimilation

```#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from numpy import shape, dot, eye, set_printoptions, diag, sum
import numpy as np
from numpy.linalg import svd
import pdb

__contact__ = 'Josef Koller, jkoller@lanl.gov'

"""
Data assimilation functions used for several projects

Authors: Josef Koller, Humberto Godinez
nstitution: Los Alamos National Laboratory
Contact: jkoller@lanl.gov

Copyright 2010 Los Alamos National Security, LLC.
"""

# -----------------------------------------------
# Data Assimilation class: enKF
# -----------------------------------------------
[docs]class ensemble(object):
"""
Ensemble-based data assimilation subroutines for the Radiation Belt Model

.. currentmodule:: spacepy.data_assimilation
.. autosummary::
~ensemble.EnKF
~ensemble.EnKF_oneobs
~ensemble.getHA
~ensemble.getHAprime
~ensemble.getHPH
~ensemble.getInnovation
~ensemble.getperturb
.. automethod:: EnKF
.. automethod:: EnKF_oneobs
.. automethod:: getHA
.. automethod:: getHAprime
.. automethod:: getHPH
.. automethod:: getInnovation
.. automethod:: getperturb
"""

def __init__(self, ensembles = 50):
self.Nens = ensembles # number of ensemble members

return None

# -----------------------------------------------
def __str__(self):
return '<enKF instance, Nens='+self.Nens+'>'

__repr__ = __str__

# -----------------------------------------------
[docs]    def EnKF(self, A, Psi, Inn, HAp):
"""
analysis subroutine after code example in
Evensen 2003 this will take the prepared
matrices and calculate the analysis most
efficiently, A will be returned

Parameters
==========
A :

Psi :

Inn :

HAp :

Returns
=======
out :

Examples
========
"""
# A   = ensemble of model states
# Psi = E = observation perturbations
# Inn = D = innovations
# HAp = S = HA'

# debugging command
#pdb.set_trace()

nobs, nens = HAp.shape
nrmin = min(nobs,nens)
NL = A.shape[0]

# compute HAp + Psi
ES = HAp + Psi

# Compute SVD of HAp + Psi  ->  U and sig,
# using Eispack
[U,sig,V] = np.linalg.svd(ES)
U = U[:,0:nrmin]

# convert eigenvalues
sig = sig**2

# Compute number of significant eigenvalues
sigsum = np.sum(sig)
sigsum1 = 0.0
nrsigma = 0
for i in range(nrmin):
if (sigsum1/sigsum < 0.999):
nrsigma = nrsigma+1
sigsum1=sigsum1+sig[i]
sig[i]=1.0/sig[i]
else:
sig[i:nrmin]=0.0
break
#pdb.set_trace()

# Compute X1
X1 = (sig*U).T

# Compute X2=X1*D
X2 = np.dot(X1,Inn)

# Compute X3=U*X2
X3 = np.dot(U,X2)

# Compute representers Reps=A'*S^T
Reps =np.dot(A,HAp.T)

# Compute A=A+Reps*X3
C = A + np.dot(Reps,X3)

return C

# -----------------------------------------------
[docs]    def EnKF_oneobs(self, A, Psi, Inn, HAp):
"""
analysis subroutine for a single observations
with the EnKF. This is a special case.

Parameters
==========
A :

Psi :

Inn :

HAp :

Returns
=======
out :

Examples
========
"""

# A   = ensemble of model states
# Psi = E = observation perturbations
# Inn = D = innovations
# HAp = S = HA'

# debugging command
#pdb.set_trace()

nobs, nens = HAp.shape
nrmin = min(nobs,nens)
NL = A.shape[0]

# compute HPH^t
HPHt = np.sum(HAp**2)

# compute R
R = np.sum(Psi**2)

# get A'
N1 = np.ones( (nens,nens) )/nens
I = np.eye(nens)
Ap = np.dot(A,I-N1)

# compute PHt
PHt = np.dot(Ap,HAp.T)

# compute K
K = 1/(HPHt + R)*PHt

# Compute A=A+K*Inn
C = A + np.outer(K,Inn)

return C

# -----------------------------------------------
[docs]    def getHAprime(self, HA):
"""
calculate ensemble perturbation of HA
HA' = HA-HA_mean

Parameters
==========
HA :

Returns
=======
out :

Examples
========
"""
m, nens = np.shape(HA)
N1 = np.ones( (nens,nens) )/nens
I = np.eye(nens)
S = np.dot(HA,I-N1)
return S

# -----------------------------------------------
[docs]    def getInnovation(self, y, Psi, HA):
"""
compute innovation ensemble D'

Parameters
==========
y :

Psi :

HA :

Returns
=======
out :

Examples
========
"""
m = np.size(y)
nens = self.Nens
# broadcoast y over ensembles to get Y
Yens = np.ones( (m,nens) ) * y[:,np.newaxis]
D = Yens + Psi - HA
return D

# -----------------------------------------------
[docs]    def getperturb(self, model, y):
"""
compute perturbations of observational vector

Parameters
==========
model :

y :

Returns
=======
out :

Examples
========
"""
m = np.size(y)
nens = self.Nens
relstd = model.PSDerr
Psi = np.random.standard_normal((m,nens)) * y[:,np.newaxis] * relstd
#Psi = np.random.standard_normal((m,nens)) * np.mean(y)*relstd
#Psi = np.zeros((m,nens))
return Psi

# -----------------------------------------------
[docs]    def getHA(self, model, Lobs, A):
"""
compute HA provided L vector of observations
and ensemble matrix A

Parameters
==========
model :

Lobs :

A :

Returns
=======
out :

Examples
========
"""
# number of observations
m = np.size(Lobs)
# number of gridpoint and ensembles
NL, nens = np.shape(A)
# extract the grid
Lgrid = model.Lgrid

# dimension of HA will be (m x NL)(NL x nens) = m x nens
HA = np.zeros( (m,nens) )
i = 0
for Lpos in Lobs:
# closest grid point
index = np.argmin(np.abs(Lpos - Lgrid))
if np.size(index) == 0:
print('\nERROR in getHA\n')
sys.exit(1)
HA[i,:] = A[index,:]
i += 1
return HA

# -----------------------------------------------
[docs]    def getHPH(self,Lobs,Pfxx):
"""
compute HPH

Parameters
==========
Lobs

Pfxx

Returns
=======
out

Examples
========
"""
# dimension(L) = nobs(t_i)
# dimension(Pfxx) = (NL,NL)

Lgrid = dd['model']['Lgrid']
NL = dd['model']['ngrid']

HP = np.zeros((len(Lobs),NL))
PH = np.zeros((NL,len(Lobs)))
HPH = np.zeros((NL,NL))

idx = np.zeros(len(Lobs), dtype=int)

for Lval,iL in zip(Lobs, range(len(Lobs))):
try:
idx[iL] = np.argmin(np.abs(Lval - Lgrid))[0]
except:
print('\nERROR in getHPH\n')
sys.exit(1)
HP[iL,:] = Pfxx[:,idx[iL]]

HPH = HP[:,idx]
PH = Pfxx[:,idx]
return PH, HPH

# -----------------------------------------------
[docs]    def add_model_error(self, model, A, PSDdata):
"""
this routine will add a standard error to the ensemble states

Parameters
==========
model :

A :

PSDdata :

Returns
=======
out :

Examples
========
"""
Lgrid = model.Lgrid
dL = Lgrid[1]-Lgrid[0]
nens = int(self.Nens)
#print y, L
# in units of L (should not be smaller than dL)

# create L vector where random noise will be added
#       erridx = np.array([False]*len(Lgrid))
#       for Lval in PSDdata['Lstar']:
#           Lidx = np.where( (Lgrid > L1) & (L2 > Lgrid) )[0]
#           erridx[Lidx] = True

# calculate the rel. average uncertainty based on (obs -
# modelfcst)/obs
#       fcst = np.mean(A, axis=1)
#       rst = np.zeros( len(PSDdata['Lstar']) )
#       for Lval, yval, i in zip(PSDdata['Lstar'], PSDdata['PSD'], range(len(PSDdata['Lstar'])) ):
#           idx = np.argmin(np.abs(Lval - Lgrid))
#           rst[i] = np.abs( yval - fcst[idx] )

#       stdev = np.mean(rst)
#       for iens in range(nens):
#           f = A[erridx, iens]
#           A[erridx, iens] = np.random.normal( f, scale = stdev )

##TODO: Check why above code was commented... this block was unindented for consistency, but should it be here?
# create L vector where random noise will be added
erridx = np.array([],dtype=int)
for Lval in PSDdata['Lstar']:
Lidx = np.where( np.isclose(Lval,Lgrid) )[0]
erridx = np.append(erridx,np.array([int(Lidx)]))

# calculate the rel. average uncertainty based on (obs -
# modelfcst)/obs
fcst = np.mean(A, axis=1)
rst = np.zeros( (len(PSDdata['Lstar']),nens) )
for iens, member in enumerate(A.T):
rst[:,iens] = np.abs( PSDdata['PSD'] - member[erridx] )

stdev = 2.0*np.std(rst,axis=1)

# debug output files
#np.savetxt('Lobs_init.dat',PSDdata['Lstar'])
#np.savetxt('y_init.dat',PSDdata['PSD'])
#np.savetxt('Lgrid_init.dat',Lgrid)
#np.savetxt('A_init.dat',A)
#np.savetxt('erridx_init.dat',erridx)
#np.savetxt('rst_init.dat',rst)
#np.savetxt('stdev_init.dat',stdev)

for i, idx in enumerate(erridx):
if (stdev[i] > 0.0):
A[idx, :] = np.random.normal( fcst[i], stdev[i], nens )
return A

# -----------------------------------------------
[docs]    def add_model_error_obs(self, model, A, Lobs, y):
"""
this routine will add a standard error to the ensemble states

Parameters
==========
model :

A :

Lobs :

y :

Returns
=======
out :

Examples
========
"""
# debugging command
#pdb.set_trace()

Lgrid = model.Lgrid
dL = Lgrid[1]-Lgrid[0]
nens = A.shape[1]
#nens = int(self.Nens)
#print y, L

# in units of L (should not be smaller than dL)

# create L vector where random noise will be added
#erridx = np.array([False]*len(Lgrid))
erridx = np.array([],dtype=int)
for Lval in Lobs:
Lidx = np.where( np.isclose(Lval,Lgrid) )[0]
erridx = np.append(erridx,np.array([int(Lidx)]))

# compute residual
# calculate the rel. average uncertainty based on (obs -
# ensemble)/obs
rst = np.zeros( (len(Lobs),nens) )
for iens, member in enumerate(A.T):
rst[:,iens] = np.abs( y - A[erridx,iens] )

# perturb with normal distribution
# around observation values
stdev = np.std(rst,axis=1)

# debug output files
#np.savetxt('Lobs.dat',Lobs)
#np.savetxt('y.dat',y)
#np.savetxt('Lgrid.dat',Lgrid)
#np.savetxt('A.dat',A)
#np.savetxt('erridx.dat',erridx)
#np.savetxt('rst.dat',rst)
#np.savetxt('stdev.dat',stdev)

pert_obs = (1.0 + 0.30*np.random.normal(size=y.shape[0]))*y

for i, idx in enumerate(erridx):
if (stdev[i] > 0.0):
A[idx,:] = np.random.normal( y[i], stdev[i], nens )
#A[idx,:] = np.random.normal( pert_obs[i], stdev[i], nens )
print('done')
return A

# -----------------------------------------------
[docs]def average_window(PSDdata, Lgrid):
"""
combine observations on same L shell in

Parameters
==========
model :

PSDdata :

HAp :

Returns
=======
out :

Examples
========
"""
# sort observations first in L
idx = PSDdata['Lstar'].argsort()
Lobs = PSDdata['Lstar'][idx]
y = PSDdata['PSD'][idx]

# map Lobs onto closest grid point
for i, obs in enumerate(y):
Lobs[i] = Lgrid[ np.argmin(np.abs(Lobs[i]-Lgrid)) ]

# identify unique grid-points of Lobs
tmpLobs = np.unique(Lobs)

# declare average observation array
tmpy = np.zeros_like(tmpLobs)

# run through all unique grid-points and compute average observation
for i, iL in enumerate(tmpLobs):
# identify idex of each unique grid-point
idx = np.where(np.isclose(iL,Lobs))
# compute average observation for each unique grid-point
tmpy[i] = np.average(y[idx])

# walk through all grid points and find how many obs available
#for i, iL in enumerate(Lgrid):
#   idx = np.where( iL == Lobs[:])[0]

# assign Lobs and y
Lobs = tmpLobs
y = tmpy
return Lobs, y

# -----------------------------------------------
# -----------------------------------------------
# -----------------------------------------------
[docs]def getobs4window(dd, Tnow):
"""
get observations in time window [Tnow - Twindow, Tnow]
from all satellites lumped together into one y vector

Parameters
==========
model :

PSDdata :

HAp :

Returns
=======
out :

Examples
========
"""
Tstart = Tnow - dd['model']['Twindow']
Tend = Tnow

# read all PSD observation and append into y,L
y = []; L = []
for satkey in dd['data']['satids']: # for every s/c
PSD = dd['data'][satkey]['PSD']
TAI = dd['data'][satkey]['TAI']
Lvals = dd['data'][satkey]['L']
indices = np.where( (Tstart < TAI) & (TAI <= Tend) )
y = np.append(y, PSD[indices])
L = np.append(L, Lvals[indices])

# sort them and average if they fall into same L range
ind = L.argsort() # return indices to a sorted y
y = y[ind]
L = L[ind]

#check if observation available
if len(y) == 0: return dd, np.array([]), np.array([])

i = 0
Lmean = []; ymean = []
Lgrid = dd['model']['Lgrid']
for iL in Lgrid[1:-1]:
i += 1
L_lower = np.mean([ Lgrid[i], Lgrid[i-1] ])
L_upper = np.mean([ Lgrid[i+1], Lgrid[i] ])
indices = np.where( (L_lower < L) & (L <= L_upper) )
if np.size(indices) == 0: continue

Lmean = np.append(Lmean, iL)
ymean = np.append(ymean, np.mean(y[indices]))

# add Lmean, ymean to dd
try: # first time executing this
dd['data']['dwindow'].append( (Tnow, Lmean, ymean) )
except:
dd['data']['dwindow'] = []
dd['data']['dwindow'].append( (Tnow, Lmean, ymean) )

# these line are only for averaging over L* as well!

Lmean = np.mean(Lmean)
# move to actual grid point
print(Lmean)
idx,  = np.where(Lgrid>Lmean)

Lmean = Lgrid[idx[0]]
ymean = np.mean(ymean)
return dd, np.array([Lmean]), np.array([ymean])

# -----------------------------------------------
[docs]def output(init,result):
"""
write results to file and be done

Parameters
==========
model :

PSDdata :

HAp :

Returns
=======
out :

Examples
========
"""
return

# -----------------------------------------------
# ----- older stuff (not used) ------------------
# -----------------------------------------------
[docs]def assimilate_JK(dd):
"""
this version is currently not working
main function to assimilate all data provided in init

Parameters
==========
model :

PSDdata :

HAp :

Returns
=======
out :

Examples
========
"""
np.random.seed(123)

Lgrid = dd['model']['Lgrid']
dd['model']['initPSD'] = np.array( 4e-7*np.exp( -(Lgrid-5)**2/2) )

nens = dd['kalman']['nens']
NL = dd['model']['ngrid']
f0 = np.array(dd['model']['initPSD'])
Tgrid = dd['model']['Tgrid']

dd['results'] = {}
dd['results']['fa'] = {}
dd['results']['ff'] = {}

# --- initialize some stuff
# broadcast all f0 into A and randomize
A = np.ones( (NL,nens) )*f0[:,np.newaxis]
#A = 10**np.random.normal(np.log10(A),0.3)  # watch out for negative values

dd['results']['fa']['Tgrid'] = np.zeros(len(Tgrid))
dd['results']['ff']['Tgrid'] = np.zeros(len(Tgrid))
dd['results']['fa']['PSD'] = np.zeros( (f0.shape[0],Tgrid.shape[0]) )
dd['results']['fa']['PSD'][:,0] = f0
dd['results']['fa']['Tgrid'][0] = Tgrid[0]
dd['results']['ff']['PSD'] = np.zeros( (f0.shape[0],Tgrid.shape[0]) )
dd['results']['ff']['PSD'][:,0] = f0
dd['results']['ff']['Tgrid'][0] = Tgrid[1]

minPSD = np.min(A)/100

# time loop to assimilate everything
for Tnow, tcnt in zip(Tgrid, range(len(Tgrid)-1)):

# make forecast using all ensembles in A
icnt = 0
for f in A.T:
fnew = forecast(dd, f, Tnow)
A[:,icnt] = fnew[:]
icnt += 1

#print(np.log10(np.mean(A,axis=1)))
Tnow = Tnow + dd['model']['Twindow']
tcnt = tcnt + 1
dd['results']['ff']['PSD'][:,tcnt] = np.mean(A, axis=1)
dd['results']['ff']['Tgrid'][tcnt] = Tnow

# get observations for time window ]Tnow-Twindow,Tnow]
dd, L,y = getobs4window(dd, Tnow)

# check if len(y) ==0
if len(y) ==0:
dd['results']['fa']['PSD'][:,tcnt] = np.mean(A, axis=1)
dd['results']['fa']['Tgrid'][tcnt] = Tnow
continue

# perturb observations so that ensemble get sufficient spread
HA = getHA(dd,L,A) # project ensemble states to obs. grid: HA(nobs,nens)
e = np.zeros( (len(y),nens) )  # this is Psi in Evensen 2003
D = np.zeros( (len(y),nens) )
err_z = 0.3
for yval, iobs in zip(y, range(len(y))):
relstd = yval*err_z
rnd = np.random.normal( yval*np.ones(nens), relstd)
rnd[np.where(rnd<minPSD)] = minPSD
e[iobs,:] = rnd - yval
D[iobs,:] = yval + e[iobs,:]

relstd = np.zeros(nens)
for yval, iobs in zip(y, range(len(y))):
idx = np.where( np.isclose(L[iobs],Lgrid) )
relstd[:] = 0.5*( yval - HA[iobs,:] )
A[idx,:] = np.random.normal( HA[iobs,:], np.abs(relstd))
idx2 = np.where(A[idx,:] < minPSD); A[idx,idx2] = minPSD # add min flux here

# get residual or innovation in the classical sense
Res = D - HA # dim = (nobs,nens)

# error of covariant matrix
ffmean = np.mean(A, axis=1)
V = np.zeros((NL,nens))
for iens in range(nens):
V[:,iens]  = A[:,iens] - ffmean

Pfxx = np.dot(V,V.T)/float(nens)
Pfyy = np.dot(e,e.T)/float(nens)

# get Kalman denominator (H*Pfxx*HT + Pfyy)^-1 by inversion
PH, HPH = getHPH(dd,L,Pfxx)
Rinv = np.linalg.inv(HPH+Pfyy)

# update all ensemble members

# check for negative fluxes
idx = np.where(A<minPSD)
A[idx] = minPSD

#
# average A from analysis step and save in results dictionary
#print y, np.mean(HA,axis=1), np.mean(getHA(init,L,A),axis=1)
dd['results']['fa']['PSD'][:,tcnt] = np.mean(A, axis=1)
dd['results']['fa']['Tgrid'][tcnt] = Tnow

# print message
print('Tnow: ', rbtools.TAInum2ISOdate(Tnow))

#print np.log10(np.mean(A,axis=1))[30]

# copy some results into ['kalman'] key
dd['kalman']['fa'] =  dd['results']['fa']
dd['kalman']['ff'] =  dd['results']['ff']
return dd

# -----------------------------------------------
"""
this routine will add a standard error to the ensemble states
"""
from numpy import where, random, log10, mean, reshape, abs, max, arange
from numpy import linspace, min

Lgrid = dd['model']['Lgrid']
dL = Lgrid[1]-Lgrid[0]
nens = int(dd['kalman']['nens'])
#print y, L

for Lcenter, yval in zip(L, y):
NLs = int(round( (L2-L1)/dL ) + 1 )
for Lpos in linspace(L1, L2, NLs):
#print dL, L1, L2, NLs
index = where( np.isclose(Lpos,Lgrid) ) # use float point comparison
stdev = 0.5*abs( yval - mean(A[index,:]) )
#print Lpos
center = reshape( A[index,:], (nens) )
A[index,:] = random.normal( center, scale=stdev)
# now check if any below 1e-99 and repeat
for i in range(nens):
icnt = 0
while A[index,i] < 1e-99:
A[index,i] = random.normal( center[i], scale=stdev)
icnt += 1
if icnt > 1000: print('too many iterations')
return A

# -----------------------------------------------
"""
this routine will add a standard error to the ensemble states
"""
Lgrid = dd['model']['Lgrid']
dL = Lgrid[1]-Lgrid[0]
nens = int(dd['kalman']['nens'])
#print(y, L)

for Lcenter, yval in zip(L, y):
NLs = int(round( (L2-L1)/dL ) + 1 )
for Lpos in np.linspace(L1, L2, NLs):
#print dL, L1, L2, NLs
index = np.where( np.isclose(Lpos,Lgrid) ) # use float comparison
stdev = 0.5*np.abs( yval - np.mean(A[index,:]) )
#print Lpos
center = np.reshape( A[index,:], (nens) )
A[index,:] = np.random.normal( center, scale=stdev)
# now check if any below 1e-99 and repeat
for i in range(nens):
icnt = 0
while A[index,i] < 1e-99:
#A[index,i] = np.random.normal( center[i], scale=stdev)
A[index,i] = 10**np.random.normal( np.log10(center[i]), scale=0.001)
icnt += 1
if icnt > 1000: print('too many iterations')
return A

# -----------------------------------------------
[docs]def forecast(dd, f, Tnow):
"""
call the model to calculate update and return result
fnew = f(Tnow+Twindow)
"""
import diff1d
fnew = diff1d.diff_LL(dd, f, Tnow)
return fnew
```