#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from numpy import shape, dot, eye, set_printoptions, diag, sum
import numpy as np
from numpy.linalg import svd
import pdb

__contact__ = 'Josef Koller,'

Data assimilation functions used for several projects

Authors: Josef Koller, Humberto Godinez
nstitution: Los Alamos National Laboratory

Copyright 2010 Los Alamos National Security, LLC.

# -----------------------------------------------
# Data Assimilation class: enKF
# -----------------------------------------------
[docs]class ensemble(object): """ Ensemble-based data assimilation subroutines for the Radiation Belt Model .. currentmodule:: spacepy.data_assimilation .. autosummary:: ~ensemble.EnKF ~ensemble.EnKF_oneobs ~ensemble.add_model_error ~ensemble.add_model_error_obs ~ensemble.getHA ~ensemble.getHAprime ~ensemble.getHPH ~ensemble.getInnovation ~ensemble.getperturb .. automethod:: EnKF .. automethod:: EnKF_oneobs .. automethod:: add_model_error .. automethod:: add_model_error_obs .. automethod:: getHA .. automethod:: getHAprime .. automethod:: getHPH .. automethod:: getInnovation .. automethod:: getperturb """ def __init__(self, ensembles = 50): self.Nens = ensembles # number of ensemble members return None # ----------------------------------------------- def __str__(self): return '<enKF instance, Nens='+self.Nens+'>' __repr__ = __str__ # -----------------------------------------------
[docs] def EnKF(self, A, Psi, Inn, HAp): """ analysis subroutine after code example in Evensen 2003 this will take the prepared matrices and calculate the analysis most efficiently, A will be returned Parameters ========== A : Psi : Inn : HAp : Returns ======= out : Examples ======== """ # A = ensemble of model states # Psi = E = observation perturbations # Inn = D = innovations # HAp = S = HA' # debugging command #pdb.set_trace() nobs, nens = HAp.shape nrmin = min(nobs,nens) NL = A.shape[0] # compute HAp + Psi ES = HAp + Psi # Compute SVD of HAp + Psi -> U and sig, # using Eispack [U,sig,V] = np.linalg.svd(ES) U = U[:,0:nrmin] # convert eigenvalues sig = sig**2 # Compute number of significant eigenvalues sigsum = np.sum(sig) sigsum1 = 0.0 nrsigma = 0 for i in range(nrmin): if (sigsum1/sigsum < 0.999): nrsigma = nrsigma+1 sigsum1=sigsum1+sig[i] sig[i]=1.0/sig[i] else: sig[i:nrmin]=0.0 break #pdb.set_trace() # Compute X1 X1 = (sig*U).T # Compute X2=X1*D X2 =,Inn) # Compute X3=U*X2 X3 =,X2) # Compute representers Reps=A'*S^T Reps,HAp.T) # Compute A=A+Reps*X3 C = A +,X3) return C
# -----------------------------------------------
[docs] def EnKF_oneobs(self, A, Psi, Inn, HAp): """ analysis subroutine for a single observations with the EnKF. This is a special case. Parameters ========== A : Psi : Inn : HAp : Returns ======= out : Examples ======== """ # A = ensemble of model states # Psi = E = observation perturbations # Inn = D = innovations # HAp = S = HA' # debugging command #pdb.set_trace() nobs, nens = HAp.shape nrmin = min(nobs,nens) NL = A.shape[0] # compute HPH^t HPHt = np.sum(HAp**2) # compute R R = np.sum(Psi**2) # get A' N1 = np.ones( (nens,nens) )/nens I = np.eye(nens) Ap =,I-N1) # compute PHt PHt =,HAp.T) # compute K K = 1/(HPHt + R)*PHt # Compute A=A+K*Inn C = A + np.outer(K,Inn) return C
# -----------------------------------------------
[docs] def getHAprime(self, HA): """ calculate ensemble perturbation of HA HA' = HA-HA_mean Parameters ========== HA : Returns ======= out : Examples ======== """ m, nens = np.shape(HA) N1 = np.ones( (nens,nens) )/nens I = np.eye(nens) S =,I-N1) return S
# -----------------------------------------------
[docs] def getInnovation(self, y, Psi, HA): """ compute innovation ensemble D' Parameters ========== y : Psi : HA : Returns ======= out : Examples ======== """ m = np.size(y) nens = self.Nens # broadcoast y over ensembles to get Y Yens = np.ones( (m,nens) ) * y[:,np.newaxis] D = Yens + Psi - HA return D
# -----------------------------------------------
[docs] def getperturb(self, model, y): """ compute perturbations of observational vector Parameters ========== model : y : Returns ======= out : Examples ======== """ m = np.size(y) nens = self.Nens relstd = model.PSDerr Psi = np.random.standard_normal((m,nens)) * y[:,np.newaxis] * relstd #Psi = np.random.standard_normal((m,nens)) * np.mean(y)*relstd #Psi = np.zeros((m,nens)) return Psi
# -----------------------------------------------
[docs] def getHA(self, model, Lobs, A): """ compute HA provided L vector of observations and ensemble matrix A Parameters ========== model : Lobs : A : Returns ======= out : Examples ======== """ # number of observations m = np.size(Lobs) # number of gridpoint and ensembles NL, nens = np.shape(A) # extract the grid Lgrid = model.Lgrid # dimension of HA will be (m x NL)(NL x nens) = m x nens HA = np.zeros( (m,nens) ) i = 0 for Lpos in Lobs: # closest grid point index = np.argmin(np.abs(Lpos - Lgrid)) if np.size(index) == 0: print('\nERROR in getHA\n') sys.exit(1) HA[i,:] = A[index,:] i += 1 return HA
# -----------------------------------------------
[docs] def getHPH(self,Lobs,Pfxx): """ compute HPH Parameters ========== Lobs Pfxx Returns ======= out Examples ======== """ # dimension(L) = nobs(t_i) # dimension(Pfxx) = (NL,NL) Lgrid = dd['model']['Lgrid'] NL = dd['model']['ngrid'] HP = np.zeros((len(Lobs),NL)) PH = np.zeros((NL,len(Lobs))) HPH = np.zeros((NL,NL)) idx = np.zeros(len(Lobs), dtype=int) for Lval,iL in zip(Lobs, range(len(Lobs))): try: idx[iL] = np.argmin(np.abs(Lval - Lgrid))[0] except: print('\nERROR in getHPH\n') sys.exit(1) HP[iL,:] = Pfxx[:,idx[iL]] HPH = HP[:,idx] PH = Pfxx[:,idx] return PH, HPH
# -----------------------------------------------
[docs] def add_model_error(self, model, A, PSDdata): """ this routine will add a standard error to the ensemble states Parameters ========== model : A : PSDdata : Returns ======= out : Examples ======== """ Lgrid = model.Lgrid dL = Lgrid[1]-Lgrid[0] nens = int(self.Nens) #print y, L # in units of L (should not be smaller than dL) radius = dL*0.9 radius = 1 # create L vector where random noise will be added # erridx = np.array([False]*len(Lgrid)) # for Lval in PSDdata['Lstar']: # L1 = np.max((Lval-radius, Lgrid[0])) # L2 = np.min((Lval+radius, Lgrid[-1])) # Lidx = np.where( (Lgrid > L1) & (L2 > Lgrid) )[0] # erridx[Lidx] = True # calculate the rel. average uncertainty based on (obs - # modelfcst)/obs # fcst = np.mean(A, axis=1) # rst = np.zeros( len(PSDdata['Lstar']) ) # for Lval, yval, i in zip(PSDdata['Lstar'], PSDdata['PSD'], range(len(PSDdata['Lstar'])) ): # idx = np.argmin(np.abs(Lval - Lgrid)) # rst[i] = np.abs( yval - fcst[idx] ) # stdev = np.mean(rst) # for iens in range(nens): # f = A[erridx, iens] # A[erridx, iens] = np.random.normal( f, scale = stdev ) ##TODO: Check why above code was commented... this block was unindented for consistency, but should it be here? # create L vector where random noise will be added erridx = np.array([],dtype=int) for Lval in PSDdata['Lstar']: Lidx = np.where( np.isclose(Lval,Lgrid) )[0] erridx = np.append(erridx,np.array([int(Lidx)])) # calculate the rel. average uncertainty based on (obs - # modelfcst)/obs fcst = np.mean(A, axis=1) rst = np.zeros( (len(PSDdata['Lstar']),nens) ) for iens, member in enumerate(A.T): rst[:,iens] = np.abs( PSDdata['PSD'] - member[erridx] ) stdev = 2.0*np.std(rst,axis=1) # debug output files #np.savetxt('Lobs_init.dat',PSDdata['Lstar']) #np.savetxt('y_init.dat',PSDdata['PSD']) #np.savetxt('Lgrid_init.dat',Lgrid) #np.savetxt('A_init.dat',A) #np.savetxt('erridx_init.dat',erridx) #np.savetxt('rst_init.dat',rst) #np.savetxt('stdev_init.dat',stdev) for i, idx in enumerate(erridx): if (stdev[i] > 0.0): A[idx, :] = np.random.normal( fcst[i], stdev[i], nens ) return A
# -----------------------------------------------
[docs] def add_model_error_obs(self, model, A, Lobs, y): """ this routine will add a standard error to the ensemble states Parameters ========== model : A : Lobs : y : Returns ======= out : Examples ======== """ # debugging command #pdb.set_trace() Lgrid = model.Lgrid dL = Lgrid[1]-Lgrid[0] nens = A.shape[1] #nens = int(self.Nens) #print y, L # in units of L (should not be smaller than dL) radius = dL*0.9 radius = 1 # create L vector where random noise will be added #erridx = np.array([False]*len(Lgrid)) erridx = np.array([],dtype=int) for Lval in Lobs: Lidx = np.where( np.isclose(Lval,Lgrid) )[0] erridx = np.append(erridx,np.array([int(Lidx)])) # compute residual # calculate the rel. average uncertainty based on (obs - # ensemble)/obs rst = np.zeros( (len(Lobs),nens) ) for iens, member in enumerate(A.T): rst[:,iens] = np.abs( y - A[erridx,iens] ) # perturb with normal distribution # around observation values stdev = np.std(rst,axis=1) # debug output files #np.savetxt('Lobs.dat',Lobs) #np.savetxt('y.dat',y) #np.savetxt('Lgrid.dat',Lgrid) #np.savetxt('A.dat',A) #np.savetxt('erridx.dat',erridx) #np.savetxt('rst.dat',rst) #np.savetxt('stdev.dat',stdev) pert_obs = (1.0 + 0.30*np.random.normal(size=y.shape[0]))*y for i, idx in enumerate(erridx): if (stdev[i] > 0.0): A[idx,:] = np.random.normal( y[i], stdev[i], nens ) #A[idx,:] = np.random.normal( pert_obs[i], stdev[i], nens ) print('done') return A
# -----------------------------------------------
[docs]def average_window(PSDdata, Lgrid): """ combine observations on same L shell in Parameters ========== model : PSDdata : HAp : Returns ======= out : Examples ======== """ # sort observations first in L idx = PSDdata['Lstar'].argsort() Lobs = PSDdata['Lstar'][idx] y = PSDdata['PSD'][idx] # map Lobs onto closest grid point for i, obs in enumerate(y): Lobs[i] = Lgrid[ np.argmin(np.abs(Lobs[i]-Lgrid)) ] # identify unique grid-points of Lobs tmpLobs = np.unique(Lobs) # declare average observation array tmpy = np.zeros_like(tmpLobs) # run through all unique grid-points and compute average observation for i, iL in enumerate(tmpLobs): # identify idex of each unique grid-point idx = np.where(np.isclose(iL,Lobs)) # compute average observation for each unique grid-point tmpy[i] = np.average(y[idx]) # walk through all grid points and find how many obs available #for i, iL in enumerate(Lgrid): # idx = np.where( iL == Lobs[:])[0] # assign Lobs and y Lobs = tmpLobs y = tmpy return Lobs, y
# ----------------------------------------------- # ----------------------------------------------- # -----------------------------------------------
[docs]def getobs4window(dd, Tnow): """ get observations in time window [Tnow - Twindow, Tnow] from all satellites lumped together into one y vector Parameters ========== model : PSDdata : HAp : Returns ======= out : Examples ======== """ Tstart = Tnow - dd['model']['Twindow'] Tend = Tnow # read all PSD observation and append into y,L y = []; L = [] for satkey in dd['data']['satids']: # for every s/c PSD = dd['data'][satkey]['PSD'] TAI = dd['data'][satkey]['TAI'] Lvals = dd['data'][satkey]['L'] indices = np.where( (Tstart < TAI) & (TAI <= Tend) ) y = np.append(y, PSD[indices]) L = np.append(L, Lvals[indices]) # sort them and average if they fall into same L range ind = L.argsort() # return indices to a sorted y y = y[ind] L = L[ind] #check if observation available if len(y) == 0: return dd, np.array([]), np.array([]) i = 0 Lmean = []; ymean = [] Lgrid = dd['model']['Lgrid'] for iL in Lgrid[1:-1]: i += 1 L_lower = np.mean([ Lgrid[i], Lgrid[i-1] ]) L_upper = np.mean([ Lgrid[i+1], Lgrid[i] ]) indices = np.where( (L_lower < L) & (L <= L_upper) ) if np.size(indices) == 0: continue Lmean = np.append(Lmean, iL) ymean = np.append(ymean, np.mean(y[indices])) # add Lmean, ymean to dd try: # first time executing this dd['data']['dwindow'].append( (Tnow, Lmean, ymean) ) except: dd['data']['dwindow'] = [] dd['data']['dwindow'].append( (Tnow, Lmean, ymean) ) # these line are only for averaging over L* as well! Lmean = np.mean(Lmean) # move to actual grid point print(Lmean) idx, = np.where(Lgrid>Lmean) Lmean = Lgrid[idx[0]] ymean = np.mean(ymean) return dd, np.array([Lmean]), np.array([ymean])
# -----------------------------------------------
[docs]def output(init,result): """ write results to file and be done Parameters ========== model : PSDdata : HAp : Returns ======= out : Examples ======== """ return
# ----------------------------------------------- # ----- older stuff (not used) ------------------ # -----------------------------------------------
[docs]def assimilate_JK(dd): """ this version is currently not working main function to assimilate all data provided in init Parameters ========== model : PSDdata : HAp : Returns ======= out : Examples ======== """ np.random.seed(123) Lgrid = dd['model']['Lgrid'] dd['model']['initPSD'] = np.array( 4e-7*np.exp( -(Lgrid-5)**2/2) ) nens = dd['kalman']['nens'] NL = dd['model']['ngrid'] f0 = np.array(dd['model']['initPSD']) Tgrid = dd['model']['Tgrid'] dd['results'] = {} dd['results']['fa'] = {} dd['results']['ff'] = {} # --- initialize some stuff # broadcast all f0 into A and randomize A = np.ones( (NL,nens) )*f0[:,np.newaxis] #A = 10**np.random.normal(np.log10(A),0.3) # watch out for negative values dd['results']['fa']['Tgrid'] = np.zeros(len(Tgrid)) dd['results']['ff']['Tgrid'] = np.zeros(len(Tgrid)) dd['results']['fa']['PSD'] = np.zeros( (f0.shape[0],Tgrid.shape[0]) ) dd['results']['fa']['PSD'][:,0] = f0 dd['results']['fa']['Tgrid'][0] = Tgrid[0] dd['results']['ff']['PSD'] = np.zeros( (f0.shape[0],Tgrid.shape[0]) ) dd['results']['ff']['PSD'][:,0] = f0 dd['results']['ff']['Tgrid'][0] = Tgrid[1] minPSD = np.min(A)/100 # time loop to assimilate everything for Tnow, tcnt in zip(Tgrid, range(len(Tgrid)-1)): # make forecast using all ensembles in A icnt = 0 for f in A.T: fnew = forecast(dd, f, Tnow) A[:,icnt] = fnew[:] icnt += 1 #print(np.log10(np.mean(A,axis=1))) Tnow = Tnow + dd['model']['Twindow'] tcnt = tcnt + 1 dd['results']['ff']['PSD'][:,tcnt] = np.mean(A, axis=1) dd['results']['ff']['Tgrid'][tcnt] = Tnow # get observations for time window ]Tnow-Twindow,Tnow] dd, L,y = getobs4window(dd, Tnow) # check if len(y) ==0 if len(y) ==0: dd['results']['fa']['PSD'][:,tcnt] = np.mean(A, axis=1) dd['results']['fa']['Tgrid'][tcnt] = Tnow continue # perturb observations so that ensemble get sufficient spread HA = getHA(dd,L,A) # project ensemble states to obs. grid: HA(nobs,nens) e = np.zeros( (len(y),nens) ) # this is Psi in Evensen 2003 D = np.zeros( (len(y),nens) ) err_z = 0.3 for yval, iobs in zip(y, range(len(y))): relstd = yval*err_z rnd = np.random.normal( yval*np.ones(nens), relstd) rnd[np.where(rnd<minPSD)] = minPSD e[iobs,:] = rnd - yval D[iobs,:] = yval + e[iobs,:] # add model error relstd = np.zeros(nens) for yval, iobs in zip(y, range(len(y))): idx = np.where( np.isclose(L[iobs],Lgrid) ) relstd[:] = 0.5*( yval - HA[iobs,:] ) A[idx,:] = np.random.normal( HA[iobs,:], np.abs(relstd)) idx2 = np.where(A[idx,:] < minPSD); A[idx,idx2] = minPSD # add min flux here # get residual or innovation in the classical sense Res = D - HA # dim = (nobs,nens) # error of covariant matrix ffmean = np.mean(A, axis=1) V = np.zeros((NL,nens)) for iens in range(nens): V[:,iens] = A[:,iens] - ffmean Pfxx =,V.T)/float(nens) Pfyy =,e.T)/float(nens) # get Kalman denominator (H*Pfxx*HT + Pfyy)^-1 by inversion PH, HPH = getHPH(dd,L,Pfxx) Rinv = np.linalg.inv(HPH+Pfyy) Adj =,Rinv), Res) # update all ensemble members A = A + Adj # check for negative fluxes idx = np.where(A<minPSD) A[idx] = minPSD # # average A from analysis step and save in results dictionary #print y, np.mean(HA,axis=1), np.mean(getHA(init,L,A),axis=1) dd['results']['fa']['PSD'][:,tcnt] = np.mean(A, axis=1) dd['results']['fa']['Tgrid'][tcnt] = Tnow # print message print('Tnow: ', rbtools.TAInum2ISOdate(Tnow)) #print np.log10(np.mean(A,axis=1))[30] # copy some results into ['kalman'] key dd['kalman']['fa'] = dd['results']['fa'] dd['kalman']['ff'] = dd['results']['ff'] return dd
# -----------------------------------------------
[docs]def addmodelerror_old2(dd, A, y, L): """ this routine will add a standard error to the ensemble states """ from numpy import where, random, log10, mean, reshape, abs, max, arange from numpy import linspace, min Lgrid = dd['model']['Lgrid'] dL = Lgrid[1]-Lgrid[0] nens = int(dd['kalman']['nens']) #print y, L radius = 1 for Lcenter, yval in zip(L, y): L1 = max((Lcenter-radius, Lgrid[0])) L2 = min((Lcenter+radius, Lgrid[-1])) #print Lcenter, Lcenter+radius, Lgrid[-1], min((Lcenter+radius,Lgrid[-1])), (L2-L1)/dL NLs = int(round( (L2-L1)/dL ) + 1 ) for Lpos in linspace(L1, L2, NLs): #print dL, L1, L2, NLs index = where( np.isclose(Lpos,Lgrid) ) # use float point comparison stdev = 0.5*abs( yval - mean(A[index,:]) ) #print Lpos center = reshape( A[index,:], (nens) ) A[index,:] = random.normal( center, scale=stdev) # now check if any below 1e-99 and repeat for i in range(nens): icnt = 0 while A[index,i] < 1e-99: A[index,i] = random.normal( center[i], scale=stdev) icnt += 1 if icnt > 1000: print('too many iterations') return A
# -----------------------------------------------
[docs]def addmodelerror_old(dd, A, y, L): """ this routine will add a standard error to the ensemble states """ Lgrid = dd['model']['Lgrid'] dL = Lgrid[1]-Lgrid[0] nens = int(dd['kalman']['nens']) #print(y, L) radius = 1 for Lcenter, yval in zip(L, y): L1 = np.max((Lcenter-radius, Lgrid[0])) L2 = np.min((Lcenter+radius, Lgrid[-1])) #print Lcenter, Lcenter+radius, Lgrid[-1], min((Lcenter+radius,Lgrid[-1])), (L2-L1)/dL NLs = int(round( (L2-L1)/dL ) + 1 ) for Lpos in np.linspace(L1, L2, NLs): #print dL, L1, L2, NLs index = np.where( np.isclose(Lpos,Lgrid) ) # use float comparison stdev = 0.5*np.abs( yval - np.mean(A[index,:]) ) #print Lpos center = np.reshape( A[index,:], (nens) ) A[index,:] = np.random.normal( center, scale=stdev) # now check if any below 1e-99 and repeat for i in range(nens): icnt = 0 while A[index,i] < 1e-99: #A[index,i] = np.random.normal( center[i], scale=stdev) A[index,i] = 10**np.random.normal( np.log10(center[i]), scale=0.001) icnt += 1 if icnt > 1000: print('too many iterations') return A
# -----------------------------------------------
[docs]def forecast(dd, f, Tnow): """ call the model to calculate update and return result fnew = f(Tnow+Twindow) """ import diff1d fnew = diff1d.diff_LL(dd, f, Tnow) return fnew