# -*- coding: utf-8 -*-
'''
Module for reading, manipulating, and visualizing BATS-R-US and SWMF output.
For more information on the SWMF, please visit the
`Center for Space Environment Modeling <http://csem.engin.umich.edu>`_.
For additional documentation :doc:`../pybats`
Copyright ©2010 Los Alamos National Security, LLC.
'''
__contact__ = 'Dan Welling, daniel.welling@uta.edu'
# Global imports (used ubiquitously throughout this module.
import os.path
from functools import wraps
from spacepy.datamodel import dmarray, SpaceData
import numpy as np
# Pybats-related decorators:
[docs]
def calc_wrapper(meth):
'''
This is a decorator for ``self.calc_*`` object methods. It establishes
``self._calcs``, a list of calcuation functions that have been called, and
adds the called function to that list to keep track of which calculations
have already been performed for the object. If a calculation has already
been performed, it is skipped in order to reduce effort.
'''
@wraps(meth)
def wrapped(self, *args, **kwargs):
# Establish list of calculations:
if not hasattr(self, '_calcs'):
self._calcs = {}
# Check to see if we're in the list, return if true.
if meth.__name__ in self._calcs:
return
# If not, add to list and stash args/kwargs:
self._calcs[meth.__name__] = [args, kwargs]
# call method:
return meth(self, *args, **kwargs)
# Return decorated function:
return wrapped
# Some common, global functions.
[docs]
def parse_filename_time(filename):
"""
Given an SWMF file whose name follows the usual standards (see below),
attempt to parse the file name to extract information about the iteration,
runtime and date/time at which the file was written. All three are
returned to the caller in that order. If any cannot be found in the
file name, "None" is returned in its place.
For '*'.outs files, ranges of times and iterations can be given in the
file names. If this is the case, a list of values will be returned
for that entry (see third example below).
Information on the format of time strings contained within SWMF output
file names can be found in the `SWMF User Manual
<http://herot.engin.umich.edu/~gtoth/SWMF/doc/HTML/SWMF/node225.html>`_.
Parameters
==========
filename : string
An SWMF output file name.
Returns
=======
i_iter : integer, list of integers, or None
The iteration at which the file was written, if found.
runtime : float, list of floats, or None
The run time (in seconds) at which the file was written, if found.
time : datetime.datetime, list of datetimes, or None
Either a datetime object or *None*, depending on the success of the
file name parsing.
Examples
========
>>> from spacepy.pybats import parse_filename_time
>>> parse_filename_time('z=0_mhd_2_t00050000_n00249620.out')
(249620, 50000.0, None)
>>> parse_filename_time('mag_grid_e20130924-232600.out')
(None, None, datetime.datetime(2013, 9, 24, 23, 26))
>>> parse_filename_time('z=0_mhd_2_e20140410-000000-000_20140410-000300-000.outs')
(None, None, [datetime.datetime(2014, 4, 10, 0, 0), datetime.datetime(2014, 4, 10, 0, 3)])
"""
from dateutil.parser import parse
import re
filename = os.path.basename(filename)
# Look for date/time:
if '_e' in filename:
subname = re.search(r'_e((\d{8}\-\d{6}(\-\d{3})?\_?)+)',
filename).groups()[0]
t_string = re.findall(r'(\d{8}\-\d{6})', subname)
time = [parse(x) for x in t_string]
if len(time) == 1:
time = time[0] # Reduce to scalar if necessary.
else:
time = None
# Look for run time (can be time from start OR datetime):
if '_t' in filename:
subname = re.search(r'_t((\d+\_?)+)', filename).groups()[0]
# Need to check if we're "old style" timestamps which is time (s) from
# start of simultion or new style which are date/time strings. Look
# at number of digits to tell difference.
groups = re.findall(r'\d+', subname)
if len(groups[0]) == 14:
time = [parse(x) for x in groups]
if len(time) == 1:
time = time[0]
runtime = None
else:
runtime = [3600*float(x[:-4])+60*float(x[-4:-2])+float(x[-2:])
for x in groups]
if len(runtime) == 1:
runtime = runtime[0]
else:
runtime = None
# Look for file iteration:
if '_n' in filename:
subname = re.search(r'_n((\d+\_?)+)', filename).groups()[0]
i_iter = [int(x) for x in re.findall(r'\d+', subname)]
# Reduce to scalar if necessary.
if len(i_iter) == 1:
i_iter = i_iter[0]
else:
i_iter = None
return i_iter, runtime, time
[docs]
def mhdname_to_tex(varname):
'''
Convert common MHD variable and unit names into LaTeX-formated strings.
'''
import re
match_u = re.search('(.*)u([xyz])', varname)
match_b = re.match('([bj])([xyz])', varname)
if 'rho' in varname.lower():
out = r'$\rho_{'+varname.replace('rho', '')+'}$'
elif varname.lower() == 'nt':
out = '$nT$'
elif varname[:2] == 'dB':
subscript = varname[2]+', '*(len(varname) > 3)+varname[3:]
out = r'$\Delta B _{'+subscript+'}$'
elif varname.lower()[-1] == 'p':
out = '$P_{'+varname[:-1]+'}$'
elif match_u:
out = '$U_{'+match_u.group(2)+', '*bool(match_u.group(1)) \
+ match_u.group(1)+'}$'
elif match_b:
out = '$'+match_b.group(1).upper()+'_{'+match_b.group(2)+'}$'
elif 'j' == varname[0].lower():
out = '$J_{'+varname[1:]+'}$'
else:
out = varname
return out
[docs]
def parse_tecvars(line):
'''
Parse the VARIABLES line from a TecPlot-formatted ascii data file. Create
a list of name-unit tuples for each variable.
'''
import re
# Create return list.
ret = []
# Ensure that the input line is a TecPlot variable line.
if "VARIABLES" not in line:
raise ValueError('Input line is not a TecPlot VARIABLES line.')
# Strip out "VARIABLES = "
line = re.sub(r'(^\s*VARIABLES\s*\=\s*)|(")', '', line)
# break into individual vars using commas.
for s in line.split(','):
m = re.match(r'\s*(\w+)\s*(\[(.*)\])*\s*', s)
ret.append((m.group(1).lower(), m.group(3)))
return ret
[docs]
def add_planet(ax, rad=1.0, ang=0.0, add_night=True, zorder=1000,
**extra_kwargs):
'''
Creates a circle of ``radius=self.para['rbody']`` and returns the
MatPlotLib Ellipse patch object for plotting. If an axis is specified
using the *ax* keyword, the patch is added to the plot.
Unlike the add_body method, the circle is colored half white (dayside)
and half black (nightside) to coincide with the direction of the
sun. Additionally, because the size of the planet is not intrinsically
known to the MHD file, the kwarg "rad", defaulting to 1.0, sets the
size of the planet. *add_night* can turn off this behavior.
Extra keywords are handed to the Ellipse generator function.
Parameters
==========
ax : Matplotlib Axes object
Set the axes on which to place planet.
Other Parameters
================
rad : float
Set radius of planet. Defaults to 1.
ang : float
Set the rotation of the day-night terminator from the y-axis, in degrees
Defaults to zero (terminator is aligned with Y-axis.)
add_night : boolean
Add night hemisphere. Defaults to **True**
zorder : int
Set the matplotlib zorder of the patch to set how other plot
elements order with the inner boundary patch. Defaults to 1000,
nightside patch is given zorder of *zorder+5*.
'''
from matplotlib.patches import Circle, Wedge
body = Circle((0, 0), rad, fc='w', zorder=zorder, **extra_kwargs)
arch = Wedge((0, 0), rad, 90+ang, -90+ang, fc='k',
zorder=zorder+5, **extra_kwargs)
ax.add_artist(body)
if add_night:
ax.add_artist(arch)
return body, arch
[docs]
def add_body(ax, rad=2.5, facecolor='lightgrey', show_planet=True,
ang=0.0, add_night=True, zorder=1000, **extra_kwargs):
'''
Creates a circle of radius=self.attrs['rbody'] and returns the
MatPlotLib Ellipse patch object for plotting. If an axis is specified
using the "ax" keyword, the patch is added to the plot.
Default color is light grey; extra keywords are handed to the Ellipse
generator function.
Because the body is rarely the size of the planet at the center of
the modeling domain, add_planet is automatically called. This can
be negated by using the show_planet kwarg.
Parameters
==========
ax : Matplotlib Axes object
Set the axes on which to place planet.
Other Parameters
================
rad : float
Set radius of the inner boundary. Defaults to 2.5.
facecolor : string
Set color of face of inner boundary circle via Matplotlib color
selectors (name, hex, etc.) Defaults to 'lightgrey'.
show_planet : boolean
Turns on/off planet indicator inside inner boundary.
Defaults to ``True``
ang : float
Set the rotation of the day-night terminator from the y-axis, in degrees
Defaults to zero (terminator is aligned with Y-axis.)
add_night : boolean
Add night hemisphere. Defaults to ``True``
zorder : int
Set the matplotlib zorder of the patch to set how other plot
elements order with the inner boundary patch. Defaults to 1000.
If a planet is added, it is given a zorder of ``zorder`` + 5.
'''
from matplotlib.patches import Ellipse
dbody = 2.0 * rad
body = Ellipse((0, 0), dbody, dbody, facecolor=facecolor, zorder=zorder,
**extra_kwargs)
if show_planet:
add_planet(ax, ang=ang, add_night=add_night, zorder=zorder+5)
ax.add_artist(body)
def _read_idl_ascii(pbdat, header='units', start_loc=0, keep_case=True):
'''
Load a SWMF IDL ascii output file and load into a pre-existing PbData
object. This should only be called by :class:`IdlFile`.
The input object must have the name of the file to be opened and read
stored in its attributes list and named 'file'.
The kwarg *header* dictates how the header line will be handled. In some
output files, the header is simply the list of units. In others, it
contains other data. If *header* is set to 'units', the header is
assumed to be a list of units for each variable contained within the
file. If set to **None** or not recognized, the header will be saved
in the object's attribute list under 'header'.
Parameters
----------
pbdat : PbData object
The object into which the data will be loaded.
Other Parameters
----------------
header : string or **None**
A string indicating how the header line will be handled; see above.
start_loc : int
Starting location within file of data to read in number of characters.
Used when reading *.outs files with more than one data frame.
keep_case : boolean
If set to True, the case of variable names will be preserved. If
set to False, variable names will be set to all lower case.
Returns
-------
True : Boolean
Returns True on success.
'''
# Open the file:
infile = open(pbdat.attrs['file'], 'r')
# Read the top header line:
headline = infile.readline().strip()
# Read & convert iters, runtime, etc. from next line:
parts = infile.readline().split()
pbdat.attrs['iter'] = int(parts[0])
pbdat.attrs['runtime'] = float(parts[1])
pbdat.attrs['ndim'] = int(parts[2])
pbdat.attrs['nparam'] = int(parts[3])
pbdat.attrs['nvar'] = int(parts[4])
# Read & convert grid dimensions.
grid = [int(x) for x in infile.readline().split()]
pbdat['grid'] = dmarray(grid)
# Data from generalized (structured but irregular) grids can be
# detected by a negative ndim value. Unstructured grids (e.g.
# BATS, AMRVAC) are signified by negative ndim values AND
# the grid size is always [x, 1(, 1)]
# Here, we set the grid type attribute to either Regular,
# Generalized, or Unstructured. Let's set that here.
pbdat['grid'].attrs['gtype'] = 'Regular'
pbdat['grid'].attrs['npoints'] = abs(pbdat['grid'].prod())
if pbdat.attrs['ndim'] < 0:
if any(pbdat['grid'][1:] > 1):
pbdat['grid'].attrs['gtype'] = 'Generalized'
else:
pbdat['grid'].attrs['gtype'] = 'Unstructured'
pbdat['grid'].attrs['npoints'] = pbdat['grid'][0]
pbdat.attrs['ndim'] = abs(pbdat.attrs['ndim'])
# Quick ref vars:
gtyp = pbdat['grid'].attrs['gtype']
npts = pbdat['grid'].attrs['npoints']
ndim = pbdat['grid'].size
nvar = pbdat.attrs['nvar']
npar = pbdat.attrs['nparam']
# Read parameters stored in file.
para = np.zeros(npar)
if npar > 0:
para[:] = infile.readline().split()
# Read variable names. Preserve or destroy case
# via keyword argument selection:
if keep_case:
names = infile.readline().split()
else:
names = infile.readline().lower().split()
# Now that we know the number of variables, we can properly handle
# the headline and units based on the kwarg *header*:
pbdat.attrs['header'] = headline
if header == 'units':
# If headline is just units:
units = headline.split()
else:
# If headline is NOT just units, create blank units:
units = [''] * (len(names)-npar)
# For some reason, there are often more units than variables
# in these files. It looks as if there are more grid units
# than grid vectors (e.g. 'R R R' implies X, Y, and Z data
# in file but only X and Y are present.) Let's try to work
# around this rather egregious error.
nSkip = len(units)+npar-len(names)
if nSkip < 0:
nSkip = 0
# Save grid names (e.g. 'x' or 'r') and save associated params.
pbdat['grid'].attrs['dims'] = tuple(names[0:ndim])
for name, para in zip(names[(nvar+ndim):], para):
pbdat.attrs[name] = para
# Create containers for the rest of the data:
for v, u in zip(names, units[nSkip:]):
pbdat[v] = dmarray(np.zeros(npts), {'units': u})
# Load grid points and data:
for i, line in enumerate(infile.readlines()):
parts = line.split()
for j, p in enumerate(parts):
pbdat[names[j]][i] = p
# Close the file:
infile.close()
# Arrange data into multidimentional arrays if necessary.
if gtyp == 'Irregular':
for v in names:
if v not in pbdat:
continue
pbdat[v] = dmarray(np.reshape(pbdat[v], pbdat['grid'], order='F'),
attrs=pbdat[v].attrs)
elif gtyp == 'Regular':
# Put coords into vectors:
prod = [1]+pbdat['grid'].cumprod().tolist()
for i, x in enumerate(pbdat['grid'].attrs['dims']):
pbdat[x] = dmarray(pbdat[x][0:prod[i+1]-prod[i]+1:prod[i]],
attrs=pbdat[x].attrs)
for v in names:
if v not in pbdat.keys():
continue
if v not in pbdat['grid'].attrs['dims']:
pbdat[v] = dmarray(np.reshape(pbdat[v], pbdat['grid'],
order='F'), attrs=pbdat[v].attrs)
[docs]
def readarray(f, dtype=np.float32, inttype=np.int32):
'''
Read an array from an unformatted binary file written out by a
Fortran program.
Parameters
----------
f : Binary file object
The file from which to read the array of values.
Other Parameters
----------------
dtype : data type
The data type used for data conversion.
inttype : Numpy integer type
Set the precision for the integers that store the size of each
entry. Defaults to numpy.int32
Returns
-------
numpy.array : Numpy array
Return the data read from file as a 1-dimensional array.
'''
if dtype is str:
dtype_size_bytes = 1
else:
dtype_size_bytes = dtype.itemsize
# Get the record length
rec_len = np.fromfile(f, dtype=inttype, count=1)
# Check that the record length is consistent with the data type
if rec_len % dtype_size_bytes != 0:
raise ValueError(
'Read error: Data type inconsistent with record length (data ' +
'type size is {0:d} bytes, record length is {1:d} bytes'.format(
int(dtype_size_bytes), int(rec_len)))
if len(rec_len) == 0:
# Zero-length record...file may be truncated
raise EOFError('Zero-length read at start marker')
try:
startpos = f.tell()
except IOError:
seekable = False
else:
seekable = True
if seekable:
f.seek(0, 2)
endpos = f.tell()
if endpos - startpos < (int(rec_len[0]) + np.dtype(inttype).itemsize):
raise EOFError('File is shorter than expected data')
f.seek(startpos + int(rec_len[0]), 0)
rec_len_end = np.fromfile(f, dtype=inttype, count=1)
f.seek(startpos, 0)
if rec_len_end != rec_len:
raise ValueError((
'Read error: End marker length ({0:d}) does not match start '
'marker length ({1:d}).').format(rec_len[0], rec_len_end[0]) +
'This indicates incorrect endiannes, wrong file type, '
'or file is corrupt.')
# Read the data
if dtype is str:
A = f.read(rec_len[0])
else:
A = np.fromfile(f, dtype=dtype, count=int(rec_len[0]/dtype_size_bytes))
# Check the record length marker at the end
rec_len_end = np.fromfile(f, dtype=inttype, count=1)
if len(rec_len_end) == 0:
# Couldn't read, file may be truncated
raise EOFError('Zero-length read at end marker')
if rec_len_end != rec_len:
# End marker is inconsistent with start marker. Something is wrong.
raise ValueError(
'Read error: End marker does not match start marker ' +
'({0:d} vs. {1:d} bytes).'.format(int(rec_len), int(rec_len_end)) +
' This indicates incorrect endiannes, wrong file type, ' +
'or file is corrupt')
return A
def _skip_entry(f, inttype):
'''
Given a binary IDL-formmatted file opened as a file object, *f*,
and whose file pointer is positioned at the start of an entry,
skip to the end of the entry and return the total number of bytes skipped.
Fortran90+ binary files wrap entries with integers that state the number
of bytes of the entry. This function reads the number of bytes before
the data entry, skips over the entry, and reads the trailing byte wrapper
before returning.
Parameters
==========
f : Binary file object
The file from which to read the array of values.
inttype : Numpy integer type
Set the precision for the integers that store the size of each
entry with the correct endianess.
'''
# Read preceding entry size, in bytes:
rec_len = np.fromfile(f, dtype=inttype, count=1)[0]
# Fast-forward over the entry:
f.seek(rec_len, 1)
# Read the trailing entry size, check for match:
rec_len_end = np.fromfile(f, dtype=inttype, count=1)[0]
if rec_len_end != rec_len:
raise IOError('Record length mismatch.')
# Return total number of bytes skipped:
return rec_len+2*inttype.itemsize
def _scan_bin_header(f, endchar, inttype, floattype):
'''
Given a binary IDL-formmatted file opened as a file object, *f*,
and whose file pointer is positioned at the start of the header,
gather some header information and return to caller.
The file object's pointer will be set to the end of the whole entry
(header plus data will be skipped.)
Parameters
----------
f : Binary file object
The file from which to read the array of values.
endchar : str
Endian character-'<' or '>'
inttype : Numpy integer type
Set the precision for the integers that store the size of each
entry with the correct endianess.
floattype : numpy float type
The data type used for data conversion with the correct endianess.
Returns
-------
info : dict
A dictionary with the *start* and *end* bytes of the record, time
information including the *iter*ation, *ndim* number of dimensions,
*npar* number of parameters, *nvar* number of variables, and
simulation *runtime* in seconds.
'''
# Get starting location in file:
rec_start = f.tell()
# Create a dictionary to store the info from the file:
info = {'start': rec_start}
# Read initial header:
headline = readarray(f, str, inttype)
headline = headline.decode('utf-8')
# Construct size of header entries:
header_fields_dtype = np.dtype([
('it', np.int32), ('t', floattype), ('ndim', np.int32),
('npar', np.int32), ('nvar', np.int32)])
header_fields_dtype.newbyteorder(endchar)
# From header, get iteration, runtime (seconds), number of dimensions,
# number of parameters, and number of variables in that order.
# Stash relevant values into the "info" dict:
vals = readarray(f, dtype=header_fields_dtype, inttype=inttype)[0]
for v, x in zip(['iter', 'runtime', 'ndim', 'nparam', 'nvar'], vals):
info[v] = x
# Dimensionality may be negative to indicate non-uniform grid:
info['ndim'] = abs(info['ndim'])
# Get gridsize:
grid = dmarray(readarray(f, inttype, inttype))
npoints = abs(grid.prod())
# Set the size of floating points in bytes:
nbytes = floattype.itemsize
# Skip the rest of the header:
if info['nparam'] > 0:
_skip_entry(f, inttype) # Skip parameters.
_skip_entry(f, inttype) # Skip variable names.
# Get header size in bytes:
head_size = f.tell() - rec_start
# Calculate the end point of the data frame
# end point = start + header + wrapper bytes + data size):
info['end'] = info['start'] + head_size + \
2*inttype.itemsize*(1+info['nvar']) + \
nbytes*(info['nvar']+info['ndim'])*npoints
# Jump to end of current data frame:
f.seek(info['end'], 0)
return info
def _probe_idlfile(filename):
'''
For an SWMF IDL-formatted output file, probe the header to determine if the
file is ASCII or binary formatted. If binary, attempt to determine the
byte ordering (i.e., endianess) and size of floating point values.
Parameters
----------
filename : str
String path of file to probe.
Returns
-------
fmt : str
Format of file, either "asc" or "bin" for ASCII or binary,
respectively.
endian : str
Binary byte ordering, either '<' or '>' for little or big endianess,
respectively (None for ASCII).
inttype : numpy.dtype
A numpy data type representing the size of the file's integers with
the appropriate byte ordering. Returns None for ASCII files.
floattype : numpy.dtype
A numpy data type representing the size of the file's floating point
values with the appropriate byte ordering. Returns None for ASCII
files.
'''
# Set default guesses:
endian = '<'
inttype = np.dtype(np.int32)
floattype = np.dtype(np.float32)
with open(filename, 'rb') as f:
# On the first try, we may fail because of wrong-endianess.
# If that is the case, swap that endian and try again.
inttype.newbyteorder(endian)
try:
# Try to parse with little endian byte ordering:
headline = readarray(f, str, np.int32)
except (ValueError, EOFError):
# On fail, rewind and try with big endian byte ordering:
endian = '>'
inttype.newbyteorder(endian)
f.seek(0)
# Attempt to read and parse the header line again:
# If we fail here, almost certainly an ascii file.
try:
headline = readarray(f, str,)
headline = headline.decode('utf-8')
except (ValueError, EOFError):
return 'asc', False, False, False
# detect double-precision file.
pos = f.tell()
RecLen = np.fromfile(f, dtype=inttype, count=1)
f.seek(pos)
# Set data types
if RecLen > 20:
floattype = np.dtype(np.float64)
floattype.newbyteorder(endian)
return 'bin', endian, inttype, floattype
def _read_idl_bin(pbdat, header='units', start_loc=0, keep_case=True,
headeronly=False):
'''Load a SWMF IDL binary output file and load into a pre-existing PbData
object. This should only be called by :class:`IdlFile`, which will
include information on endianess and size of integers & floating point
values.
The input object must have the name of the file to be opened and read
stored in its attributes list and named 'file'.
The kwarg *header* dictates how the header line will be handled. In some
output files, the header is simply the list of units. In others, it
contains other data. If *header* is set to 'units', the header is
assumed to be a list of units for each variable contained within the
file. If set to **None** or not recognized, the header will be saved
in the object's attribute list under 'header'.
.. versionchanged:: 0.5.0
Unstructured data are presented as in the files. When reading
3D magnetosphere files, this preserves the 3D block structure,
as required for the BATSRUS interpolator in the `Kamodo
Heliophysics model readers package
<https://github.com/nasa/kamodo>`_. Before 0.5.0, binary
unstructured data were sorted in an attempt to put nearby
positions close to each other in the data arrays. This sorting
was nondeterministic and has been removed; see
`bats.Bats2d.extract` and `qotree.QTree` for processing
adjacent cells.
Parameters
----------
pbdat : PbData object
The object into which the data will be loaded.
Other Parameters
----------------
header : string or **None**
A string indicating how the header line will be handled; see above.
start_loc : int
Location to start reading inside the file as number of bytes. This is
used to set the starting position of a given data frame for *.outs
files. Default is zero (read at beginning of file).
keep_case : boolean
If set to True, the case of variable names will be preserved. If
set to False, variable names will be set to all lower case.
Returns
-------
True : Boolean
Returns True on success.
'''
# Some convenience variables:
endchar, inttype, floattype = pbdat._endchar, pbdat._int, pbdat._float
# Open, read, and parse the file into numpy arrays.
# Note that Fortran writes integer buffers around records, so
# we must parse those as well.
with open(pbdat.attrs['file'], 'rb') as infile:
# Jump to start_loc:
infile.seek(start_loc, 0)
# Read header information.
headline = readarray(infile, str, inttype)
headline = headline.decode('utf-8')
# Parse rest of header
header_fields_dtype = np.dtype([
('it', np.int32), ('t', floattype), ('ndim', np.int32),
('npar', np.int32), ('nvar', np.int32)])
header_fields_dtype.newbyteorder(endchar)
(pbdat.attrs['iter'], pbdat.attrs['runtime'],
pbdat.attrs['ndim'], pbdat.attrs['nparam'], pbdat.attrs['nvar']) = \
readarray(infile,
dtype=header_fields_dtype,
inttype=inttype)[0]
# Get gridsize
pbdat['grid'] = dmarray(readarray(infile, inttype, inttype))
# Data from generalized (structured but irregular) grids can be
# detected by a negative ndim value. Unstructured grids (e.g.
# BATS, AMRVAC) are signified by negative ndim values AND
# the grid size is always [x, 1(, 1)]
# Here, we set the grid type attribute to either Regular,
# Generalized, or Unstructured. Let's set that here.
pbdat['grid'].attrs['gtype'] = 'Regular'
pbdat['grid'].attrs['npoints'] = abs(pbdat['grid'].prod())
if pbdat.attrs['ndim'] < 0:
if any(pbdat['grid'][1:] > 1):
pbdat['grid'].attrs['gtype'] = 'Generalized'
else:
pbdat['grid'].attrs['gtype'] = 'Unstructured'
pbdat['grid'].attrs['npoints'] = pbdat['grid'][0]
pbdat.attrs['ndim'] = abs(pbdat.attrs['ndim'])
# Quick ref vars:
gtyp = pbdat['grid'].attrs['gtype']
npts = pbdat['grid'].attrs['npoints']
ndim = pbdat['grid'].size
nvar = pbdat.attrs['nvar']
npar = pbdat.attrs['nparam']
# Read parameters stored in file.
para = np.zeros(npar)
if npar > 0:
para[:] = readarray(infile, floattype, inttype)
names = readarray(infile, str, inttype).decode('utf-8')
# Preserve or destroy original case of variable names:
if not keep_case:
names = names.lower()
names.strip()
names = names.split()
# Now that we know the number of variables, we can properly handle
# the headline and units based on the kwarg *header*:
pbdat.attrs['header'] = headline
if header == 'units':
# If headline is just units:
units = headline.split()
else:
# If headline is NOT just units, create blank units:
units = [''] * (len(names) - npar)
# For some reason, there are often more units than variables
# in these files. It looks as if there are more grid units
# than grid vectors (e.g. 'R R R' implies X, Y, and Z data
# in file but only X and Y are present.) Let's try to work
# around this curiousity:
nSkip = len(units) + npar - len(names)
if nSkip < 0:
nSkip = 0
# Save grid names (e.g. 'x' or 'r') and save associated params.
pbdat['grid'].attrs['dims'] = tuple(names[0:ndim])
for name, para in zip(names[(nvar+ndim):], para):
pbdat.attrs[name] = para
# Get the grid points...
prod = [1] + pbdat['grid'].cumprod().tolist()
# Read the data into a temporary array
griddata = readarray(infile, floattype, inttype)
for i in range(0, ndim):
# Get the grid coordinates for this dimension
tempgrid = griddata[npts*i:npts*(i+1)]
# Unstructred grids get loaded as vectors.
if gtyp == 'Unstructured':
pbdat[names[i]] = dmarray(tempgrid)
# Irregularly gridded items need multidimensional grid arrays:
elif gtyp == 'Irregular':
pbdat[names[i]] = dmarray(np.reshape(tempgrid, pbdat['grid'],
order='F'))
# Regularly gridded ones need vector grid arrays:
elif gtyp == 'Regular':
pbdat[names[i]] = dmarray(np.zeros(pbdat['grid'][i]))
for j in range(int(pbdat['grid'][i])):
pbdat[names[i]][j] = tempgrid[j*int(prod[i])]
else:
raise ValueError('Unknown grid type: {0}'.format(
pbdat.gridtype))
# Add units to grid.
if units:
pbdat[names[i]].attrs['units'] = units.pop(nSkip)
# Get the actual data and sort.
for i in range(ndim, nvar+ndim):
pbdat[names[i]] = dmarray(readarray(infile, floattype, inttype))
if units:
pbdat[names[i]].attrs['units'] = units.pop(nSkip)
if gtyp != 'Unstructured':
# Put data into multidimensional arrays.
pbdat[names[i]] = pbdat[names[i]].reshape(
pbdat['grid'], order='F')
[docs]
class PbData(SpaceData):
'''
The base class for all PyBats data container classes. Inherits from
:class:`spacepy.datamodel.SpaceData` but has additional methods for quickly
exploring an SWMF dataset.
Just like :class:`spacepy.datamodel.SpaceData` objects, *PbData* objects
work just like dictionaries except they have special **attr** dictionary
attributes for both the top-level object and most values. This means that
the following syntax can be used to explore a generic *PbData* object:
>>>print obj.keys()
>>>print obj.attrs
>>>value = obj[key]
Printing *PbData* objects will produce a tree of contents and attributes;
calling ``self.listunits()`` will print all values that have the 'units'
attribute and the associated units. Hence, it is often most instructive
to use the following two lines to quickly learn a *PbData*'s contents:
>>>print obj
>>>obj.listunits()
*PbData* is the main organizational tool for Pybats datasets, so the
information here is applicable to nearly all Pybats classes.
'''
[docs]
def __init__(self, *args, **kwargs):
super(PbData, self).__init__(*args, **kwargs) # Init as SpaceData.
def __repr__(self):
return 'PyBats data object'
def __str__(self):
'''
Display contents of container.
'''
print(type(self))
self.tree(attrs=True, verbose=True)
return ''
[docs]
def listunits(self):
'''
List all variables and associated units.
'''
keys = list(self.keys())
keys.sort()
length = 0
for key in keys:
if len(key) > length:
length = len(key)
form = "%%%is:%%s" % length
for key in keys:
if 'units' in self[key].attrs:
print(form % (key, self[key].attrs['units']))
[docs]
def timeseries_append(self, obj):
'''
If *obj* is of the same type as self, and both have a 'time'
entry, append all arrays within *obj* that have the same size as
'time' to the corresponding arrays within self. This is useful
for combining time series data of consecutive data.
'''
# check to make sure that the append appears feasible:
if type(obj) != type(self):
raise TypeError('Cannot append type {} to type {}.'.format(
type(obj), type(self)))
if 'time' not in obj:
raise ValueError('No time vector in {} object'.format(type(obj)))
if 'time' not in self:
raise ValueError('No time fector in {} object'.format(type(self)))
npts = self['time'].size
for v in self:
# Only combine vectors that are the same size as time and are
# in both objects.
if v not in obj:
continue
if self[v].size != npts:
continue
# Append away.
self[v] = dmarray(np.append(self[v], obj[v]), self[v].attrs)
[docs]
class IdlFile(PbData):
'''
An object class that reads/parses an IDL-formatted output file from the
SWMF and places it into a :class:`spacepy.pybats.PbData` object.
Parameters
----------
filename : string
A ``*.out`` or ``*.outs`` SWMF output file name.
Other Parameters
----------------
header : str or ``None``
Determine how to interpret the additional header information.
Defaults to 'units'.
keep_case : bool
If set to True, the case of variable names will be preserved. If
set to False, variable names will be set to all lower case.
Notes
-----
PyBats assumes little endian byte ordering because
this is what most machines use. However, there is an autodetect feature
such that, if PyBats doesn't make sense of the first read (a record length
entry, or RecLen), it will proceed using big endian ordering. If this
doesn't work, the error will manifest itself through the "struct" package
as an "unpack requires a string of argument length 'X'".
.. versionchanged:: 0.5.0
Unstructured data are presented as in the files. When reading
3D magnetosphere files, this preserves the 3D block structure,
as required for the BATSRUS interpolator in the `Kamodo
Heliophysics model readers package
<https://github.com/nasa/kamodo>`_. Before 0.5.0, binary
unstructured data were sorted in an attempt to put nearby
positions close to each other in the data arrays. This sorting
was nondeterministic and has been removed; see
:meth:`~spacepy.pybats.bats.Bats2d.extract` and
:class:`~spacepy.pybats.qotree.QTree` for processing
adjacent cells. (ASCII data were never sorted.)
'''
[docs]
def __init__(self, filename, iframe=0, header='units',
keep_case=True, *args, **kwargs):
super(IdlFile, self).__init__(*args, **kwargs) # Init as PbData.
# Gather information about the file: format, endianess (if necessary),
# number of picts/frames, etc.:
fmt, endchar, inttype, floattype = _probe_idlfile(filename)
self.attrs['file'] = filename # Save file name.
self.attrs['format'] = fmt # Save file format.
# Gather information about time range of file from name:
t_info = list(parse_filename_time(filename))
for i in range(len(t_info)):
if type(t_info[i]) != list:
t_info[i] = [t_info[i]]
self.attrs['iter_range'] = t_info[0]
self.attrs['runtime_range'] = t_info[1]
self.attrs['time_range'] = t_info[2]
# For binary files, store information about file:
self._endchar, self._int, self._float = endchar, inttype, floattype
# Save file interpretation options tucked as protected attributes:
self._header, self._keep_case = header, keep_case
# Collect information about the number of epoch frames in the file:
if fmt == 'bin':
self._scan_bin_frames()
else:
self._scan_asc_frames()
# Read one entry of the file (defaults to first frame):
self.read(iframe=iframe)
# Update information about the currently loaded frame.
self.attrs['iframe'] = iframe
self.attrs['time'] = self.attrs['times'][iframe]
# Create string representation of run time:
time = self.attrs['runtime'] # Convenience variable.
self.attrs['strtime'] = "{:04.0f}h{:02.0f}m{:06.3f}s".format(
np.floor(time//3600), np.floor(time % 3600//60.0), time % 60.0)
def _scan_bin_frames(self):
'''
Open the binary-formatted file associated with *self* and scan all
headers to count the number of epoch frames and details of each.
Results are stored within *self*.
'''
from datetime import timedelta as tdelt
# Create some variables to store information:
nframe = 0 # Number of epoch frames in file.
iters, runtimes = [], [] # Lists of time information.
offset = [] # Byte offset from beginning of file of each frame.
with open(self.attrs['file'], 'rb') as f:
f.seek(0, 2) # Jump to end of file (in Py3, this returns location)
file_size = f.tell() # Get number of bytes in file.
f.seek(0) # Rewind to file start.
# Loop over all data frames and collect information:
while f.tell() < file_size:
info = _scan_bin_header(f, self._endchar,
self._int, self._float)
# Stash information into lists:
offset.append(info['start'])
iters.append(info['iter'])
runtimes.append(info['runtime'])
nframe += 1
# Store everything as file-level attrs; convert to numpy arrays.
self.attrs['nframe'] = nframe
self.attrs['iters'] = np.array(iters)
self.attrs['runtimes'] = np.array(runtimes)
# Use times info to build datetimes and update file-level attributes.
if self.attrs['time_range'] != [None]:
self.attrs['times'] = np.array(
[self.attrs['time_range'][0]+tdelt(seconds=int(x-runtimes[0]))
for x in runtimes])
else:
self.attrs['times'] = np.array(nframe*[None])
# Ensure all ranges are two-element arrays and update using
# information we gathered from the header.
if self.attrs['iter_range'] == [None]:
self.attrs['iter_range'] = [iters[0], iters[-1]]
if self.attrs['runtime_range'] == [None]:
self.attrs['runtime_range'] = [runtimes[0], runtimes[-1]]
# Stash the offset of frames as a private attribute:
self._offsets = np.array(offset)
def _scan_asc_frames(self):
'''
Open the ascii-formatted file associated with *self* and scan all
headers to count the number of epoch frames and details of each.
Results are stored within *self*.
This is a placeholder only until ascii-formatted .outs files are
fully supported.
'''
# Only use top-level frame as of now.
self._offsets = np.array([0])
self.attrs['nframe'] = 1
self.attrs['iters'] = [0, 0]
self.attrs['runtimes'] = [0, 0]
self.attrs['times'] = [0, 0]
[docs]
def switch_frame(self, iframe):
'''
For files that have more than one data frame (i.e., ``*.outs`` files),
load data from the *iframe*-th frame into the object replacing what is
currently loaded.
'''
# Ensure that given frame exists within object:
if iframe < -self.attrs['nframe'] or iframe >= self.attrs['nframe']:
raise IndexError("iframe {} is outside range of [0,{})".format(
iframe, self.attrs['nframe']))
self.read(iframe)
# Update information about the current frame:
self.attrs['iframe'] = self.attrs['nframe'] \
+ iframe if iframe < 0 else iframe
self.attrs['iter'] = self.attrs['iters'][iframe]
self.attrs['runtime'] = self.attrs['runtimes'][iframe]
self.attrs['time'] = self.attrs['times'][iframe]
# Update string time:
time = self.attrs['runtime'] # Convenience variable.
self.attrs['strtime'] = "{:04.0f}h{:02.0f}m{:06.3f}s".format(
np.floor(time//3600), np.floor(time % 3600//60.0), time % 60.0)
# Re-do calculations if any have been done.
if hasattr(self, '_calcs'):
# Get dictionary of name-method pairs for called calculations:
calcs = self._calcs
# Reset dict of called calculations:
self._calcs = {}
# Call all calculation methods:
for name in calcs:
args, kwargs = calcs[name]
getattr(self, name)(*args, **kwargs)
def __repr__(self):
return 'SWMF IDL-Binary file "%s"' % (self.attrs['file'])
[docs]
def read(self, iframe=0):
'''
This method reads an IDL-formatted BATS-R-US output file and places
the data into the object. The file read is self.filename which is
set when the object is instantiated.
'''
# Get location of frame that we wish to read:
loc = self._offsets[iframe]
if self.attrs['format'] == 'asc':
_read_idl_ascii(self, header=self._header, start_loc=loc,
keep_case=self._keep_case)
elif self.attrs['format'] == 'bin':
_read_idl_bin(self, header=self._header, start_loc=loc,
keep_case=self._keep_case)
else:
raise ValueError('Unrecognized file format: {}'.format(
self._format))
[docs]
class LogFile(PbData):
''' An object to read and handle SWMF-type logfiles.
*LogFile* objects read and hold all information in an SWMF
ascii time-varying logfile. The file is read upon instantiation.
Many SWMF codes
produce flat ascii files that can be read by this class; the most
frequently used ones have their own classes that inherit from this.
See :class:`spacepy.pybats.PbData` for information on how to explore
data contained within the returned object.
Usage:
>>>data = spacepy.pybats.LogFile('filename.log')
========= ===========================================
kwarg Description
--------- -------------------------------------------
starttime Manually set the start time of the data.
========= ===========================================
Time is handled by Python's datetime package. Given that time
may or may not be given in the logfile, there are three options
for how time is returned:
1. if the full date and time are listed in the file, ``self['time']``
is an array of datetime objects corresponding to the entries. The
starttime kwarg is ignored.
2. If only the runtime (seconds from start of simulation) is given,
self['time'] is an array of datetime objects that starts from
the given *starttime* kwarg which defaults to 1/1/1 00:00UT.
3. If neither of the above are given, the time is assumed to
advance one second per line starting from either the starttime kwarg
or from 2000/1/1 00:00UT + the first iteration (if given in file.)
As you can imagine, this is sketchy at best.
This time issue is output dependent: some SWMF models place the full date
and time into the log by default while others will almost never include the
full date and time. The variable ``self['runtime']`` contains the more
generic seconds from simulation start values.
Example usage:
>>> import spacepy.pybats as pb
>>> import pylab as plt
>>> import datetime as dt
>>> time1 = dt.datetime(2009,11,30,9,0)
>>> file1 = pb.logfile('satfile_n000000.dat', starttime=time1)
>>> plt.plot(file1['time'], file1['dst'])
'''
import datetime as dt
[docs]
def __init__(self, filename, starttime=(2000, 1, 1, 0, 0, 0),
keep_case=True, *args, **kwargs):
super(LogFile, self).__init__(*args, **kwargs)
self.attrs['file'] = filename
self.read(starttime, keep_case)
[docs]
def read(self, starttime, keep_case=True):
'''
Load the ascii logfile located at self.filename.
This method is automatically called upon instantiation.
'''
import numpy as np
import datetime as dt
# Convert starttime from tuple to datetime object.
if type(starttime) != dt.datetime:
if len(starttime) != 6:
raise ValueError('starttime must be a length 6 Tuple ' +
'or a datetime.datetime object')
starttime = dt.datetime(starttime[0], starttime[1], starttime[2],
starttime[3], starttime[4], starttime[5])
# Slurp in entire file.
infile = open(self.attrs['file'], 'r')
raw = infile.readlines()
infile.close()
# Parse the header.
self.attrs['descrip'] = raw.pop(0)
raw_names = raw.pop(0)
if not keep_case:
raw_names = raw_names.lower()
names = raw_names.split()
loc = {}
# Keep track of in which column each data vector lies.
for i, name in enumerate(names):
loc[name] = i
# Use the header info to set up arrays
# for time, iteration, and the data.
npts = len(raw)
# If opening an incomplete file, we must skip the last line.
if len(raw[-1].split()) < len(names):
npts = npts-1
self.attrs['npts'] = npts
# Pop time/date/iteration names off of Namevar.
for key in ['year', 'mo', 'dy', 'hr', 'mn', 'sc', 'msc',
't', 'it', 'yy', 'mm', 'dd', 'hh', 'ss', 'ms']:
if key in loc:
names.pop(names.index(key))
# Create containers for data:
time = dmarray(np.zeros(npts, dtype=object))
runtime = dmarray(np.zeros(npts), attrs={'units': 's'})
self['iter'] = dmarray(np.zeros(npts))
for name in names:
self[name] = dmarray(np.zeros(npts))
for i in range(npts):
vals = raw[i].split()
# Set time:
if 'year' in loc or 'yy' in loc:
# If "year" or "yy" is listed, we have the full datetime.
if 'year' in loc:
# BATS date format
time[i] = (dt.datetime(
int(vals[loc['year']]), # Year
int(vals[loc['mo']]), # Month
int(vals[loc['dy']]), # Day
int(vals[loc['hr']]), # Hour
int(vals[loc['mn']]), # Minute
int(vals[loc['sc']]), # Second
int(vals[loc['msc']]) * 1000 # microsec
))
elif 'yy' in loc:
# RIM date format
time[i] = (dt.datetime(
int(vals[1]), # Year
int(vals[2]), # Month
int(vals[3]), # Day
int(vals[4]), # Hour
int(vals[5]), # Minute
int(vals[6]), # Second
int(vals[7]) * 1000 # microsec
))
diffT = time[i] - time[0]
runtime[i] = diffT.days*24.0*3600.0 + \
diffT.seconds + \
diffT.microseconds*1E-6
elif 't' in loc:
# If "t" but no "year" is listed, we only have runtime.
# Check to ensure number of seconds doesn't
# exceed 24 hours.
nowsecs = float(vals[loc['t']])
nowdays = int(nowsecs / (24.0 * 3600.0))
nowsecs = nowsecs - (24.0 * 3600.0 * nowdays)
delta = dt.timedelta(days=nowdays, seconds=nowsecs)
newtime = starttime + delta
time[i] = newtime
runtime[i] = nowdays*24.0*3600.0 + nowsecs
elif 'it' in loc:
# Check to ensure number of seconds doesn't
# exceed 24 hours.
nowsecs = float(vals[loc['it']])
nowdays = int(nowsecs / (24.0 * 3600.0))
nowsecs = nowsecs - (24.0 * 3600.0 * nowdays)
delta = dt.timedelta(days=nowdays, seconds=nowsecs)
newtime = starttime + delta
time[i] = newtime
runtime[i] = nowdays*24.*3600.+nowsecs
else:
time[i] = starttime + dt.timedelta(float(i))
# Set iteration:
if 'it' in loc:
if '*' in vals[loc['it']]:
self['iter'][i] = -1
else:
self['iter'][i] = int(vals[loc['it']])
else:
self['iter'][i] = i
# Collect data
for j, name in enumerate(names):
self[name][i] = float(vals[loc[name]])
# Convert time and runtime to dmarrays.
self['time'] = time
self['runtime'] = runtime
[docs]
class NgdcIndex(PbData):
'''
Many models incorporated into the SWMF rely on National Geophysical Data
Center (NGDC) provided index files (especially F10.7 and Kp). These
files, albeit simple ascii, have a unique format and expansive header
that can be cumbersome to handle. Data files can be obtained from
http://spidr.ngdc.noaa.gov .
NgdcIndex objects aid in reading, creating, and visualizing these files.
Creating an :class:`NgdcIndex` object is simple:
>>> from spacepy import pybats
>>> obj=pybats.NgdcIndex(filename='ngdc_example.dat')
Upon instantiation, if *filename* is a valid file AND kwarg *load* is set
to boolean True, the contents of *filename* are loaded into the object
and no other work needs to be done.
If *filename* is False or *load* is False, a blank :class:`NgdcIndex`
is created for the user to manipulate.
The user can set the time array and the ssociated data values to any values
desired and use the method *obj.write()* to dump the contents to an NGDC
formatted input file. See the documentation for the write method for
more details.
This class is a work-in-progress. It is especially tuned for SWMF-needs
and cannot be considered a general function for the handling of generic
NGDC files.
=========== ============================================================
Kwarg Description
----------- ------------------------------------------------------------
filename Set the input/output file name.
load Read file upon instantiation? Defaults to **True**
=========== ============================================================
'''
[docs]
def __init__(self, filename=None, load=True, *args, **kwargs):
super(NgdcIndex, self).__init__(*args, **kwargs)
# Set key information.
self.attrs['file'] = filename
self.attrs['comments'] = []
if load:
self._read()
def _read(self):
'''
Read an existing NGDC file into memory. Should only be called upon
instantiation.
'''
from dateutil.parser import parse
# Start by reading the header.
infile = open(self.attrs['file'], 'r')
temp = infile.readline()
while temp != '#'+50*'-'+'\n': # Detect start of file.
# Skip blank comments, save substantive ones.
if temp == '#\n':
temp = infile.readline()
continue
self.attrs['comments'].append(temp)
temp = infile.readline()
# Skip "data start" marker.
infile.readline()
# Continue to read and sort all data in file.
def read_one_var():
attrs = {}
# Get current variable. Strip out all but name.
varname = infile.readline().split(':')[-1].strip()
temp = infile.readline()
while temp != '#>\n':
if temp != '#\n':
parts = temp[1:].split(':')
attrs[parts[0].strip()] = parts[-1].strip()
temp = infile.readline()
# Create some dummy arrays.
t = []
d = []
# Skip the sub-header.
temp = infile.readline()
temp = infile.readline()
# Parse data until next variable or EOF.
while (temp != '#>\n') and (temp != ''):
parts = temp.split()
t.append(parse(' '.join(parts[0:2])))
d.append(float(parts[2]))
temp = infile.readline()
# Put the variable together.
self[varname] = dmarray([t, d], attrs=attrs)
# Make judgement as to whether there is more data or not.
return (temp == '#>\n')
IsData = True
while IsData:
IsData = read_one_var()
[docs]
def write(self, outfile=False):
'''
Write the :class:`NgdcIndex` object to file. Kwarg *outfile* can be
used to specify the path of the output file; if it is not set,
*self.attrs['file']* is used. If this is not set, default to
"ngdc_index.dat".
'''
if not outfile:
if self.attrs['file'] is not None:
outfile = self.attrs['file']
else:
outfile = 'ngdc_index.dat'
out = open(outfile, 'w')
# Write header.
for c in self.attrs['comments']:
out.write(c)
out.write(2*'#\n'+'#'+50*'-'+'\n')
# Write variables.
for k in self:
out.write('#>\n')
out.write('#%s: %s\n' % ('Element', k))
for a in self[k].attrs:
out.write('#%s: %s\n' % (a, self[k].attrs[a]))
out.write('#>\n#yyyy-MM-dd HH:mm value qualifier description\n')
for i in range(len(self[k][0, :])):
t = self[k][0, i]
d = self[k][1, i]
out.write('%04i-%02i-%02i %02i:%02i%7.1f\t""\t""\n' %
(t.year, t.month, t.day, t.hour, t.minute, d))
[docs]
class SatOrbit(PbData):
'''
An class to load, read, write, and handle BATS-R-US satellite orbit
input files. These files are used to fly virtual satellites through
the MHD domain. Note that the output files should be handled by
the :class:`LogFile` and not this satorbit object. The object's
required and always present attributes are:
============ ==============================================================
Attribute Description
============ ==============================================================
head A list of header lines for the file that contain comments.
coor The three-letter code (see SWMF doc) of the coord system.
file Location of the file to read/write.
============ ==============================================================
The object should always have the following two data keys:
============ ==============================================================
Key Description
============ ==============================================================
time A list or numpy vector of datetime objects
xyz A 3 x len(time) numpy array of x,y,z coordinates associated
with the time vector.
============ ==============================================================
A "blank" instantiation will create an empty object for the user to fill.
This is desired if the user wants to create a new orbit, either from
data or from scratch, and save it in a properly formatted file. Here's
an example with a "stationary probe" type orbit where the attributes are
filled and then dropped to file:
>>> from spacepy.pybats import SatOrbit
>>> import datetime as dt
>>> import numpy as np
>>> sat = SatOrbit()
>>> sat['time'] = [ dt.datetime(2000,1,1), dt.datetime(2000,1,2) ]
>>> pos = np.zeros( (3,2) )
>>> pos[:,0]=[6.6, 0, 0]
>>> pos[:,1]=[6.6, 0, 0]
>>> sat['xyz'] = pos
>>> sat.attrs['coor'] = 'SMG'
>>> sat.attrs['file'] = 'noon_probe.dat'
>>> sat.write()
If instantiated with a file name, the name is loaded into the object.
For example,
>>> sat=SatOrbit('a_sat_orbit_file.dat')
...will populate all fields with the data in *a_sat_orbit_file.dat*.
'''
[docs]
def __init__(self, filename=None, *args, **kwargs):
'''
Create a satorbit object. If I{file} is not given, buid an
empty satorbit object is instantiated for the user to fill
with values of their own creation.
'''
import numpy as np
super(SatOrbit, self).__init__(*args, **kwargs)
self.attrs['file'] = filename
self.attrs['head'] = []
self.attrs['coor'] = 'GSM'
self['time'] = dmarray(np.zeros(0, dtype=object))
if filename:
try:
self.read()
except IOError as reason:
self['xyz'] = dmarray(np.zeros((3, 1)))
raise IOError(reason)
else:
# fill with empty stuff.
self['xyz'] = dmarray(np.zeros((3, 1)))
[docs]
def read(self):
'''
Read and parse a satellite orbit input file into the satorbit object.
'''
import numpy as np
import datetime as dt
# Try to open the file. If we fail, pass the exception
# to the caller.
infile = open(self.attrs['file'], 'r')
# Slurp contents
raw = infile.readlines()
infile.close()
# Parse header. Save coordinate system and any comments.
while 1:
line = raw.pop(0).strip()
if line:
if line == '#START':
break
elif line == '#COOR':
self.attrs['coor'] = raw.pop(0).strip()
else:
self.attrs['head'].append(line)
# Read and store all values.
npts = len(raw)
self['xyz'] = dmarray(np.zeros((3, npts)))
self['time'] = dmarray(np.zeros(npts, dtype=object))
for i, line in enumerate(raw):
parts = line.split()
self['time'][i] = dt.datetime(
int(parts[0]), # year
int(parts[1]), # month
int(parts[2]), # day
int(parts[3]), # hour
int(parts[4]), # min
int(parts[5]), # sec
int(parts[6]) * 1000 # micro seconds
)
self['xyz'][:, i] = parts[7:]
[docs]
def write(self):
'''Write a L{satorbit object<pybats.satorbit>} to file using the
correct SWMF-input format. The file will be written to the
location specified by self.filename. An error is thrown if this
attribute is not set.
'''
with open(self.attrs['file'], 'w') as outfile:
# Start by writing header, coordinate system, and then #START.
for line in self.attrs['head']:
outfile.write(line+'\n')
outfile.write('\n')
outfile.write('#COOR\n{}\n\n'.format(self.attrs['coor']))
outfile.write('#START\n')
# Write the rest of the orbit.
npts = len(self['time'])
for i in range(npts):
# Time:
outfile.write('{:%Y %m %d %H %M %S} {:03.0f} '.format(
self['time'][i], self['time'][i].microsecond/1000.0))
# Position:
outfile.write('{:13.7E} {:13.7E} {:13.7E}\n'.format(
self['xyz'][0, i], self['xyz'][1, i], self['xyz'][2, i]))