#!/usr/bin/env python
"""High-level implementation of the dbprocessing processing queue."""
from __future__ import absolute_import
from __future__ import print_function
import datetime
import imp
import os
import shutil
import sys
import tempfile
import time
import traceback
from operator import itemgetter
from . import DBfile
from . import DBlogging
from . import DBqueue
from . import DButils
from . import Utils
from . import runMe
from .Utils import strargs_to_args
from sqlalchemy.exc import IntegrityError
[docs]class ProcessQueue(object):
"""Main code used to process the Queue.
Looks in incoming and builds all possible files
Warnings
--------
As this object holds a reference to
:class:`~dbprocessing.DButils.DButils`, that database should be
closed before the program terminates. Deleting this object will
ordinarily suffice.
"""
[docs] def __init__(self,
mission, dryrun=False, echo=False):
"""Initializes the process queue
Parameters
----------
mission : str
Mission database, as in :class:`.DButils`. May also be
an existing instance of :class:`.DButils`.
dryrun : bool, default False
Treat all functionality as "dry run", make no changes.
echo : bool, default False
Log all SQL statements (as in
:class:`~dbprocessing.DButils.DButils`).
"""
self.dryrun = dryrun
self.filename = None
"""Full path to file currently being processed (:class:`str`)"""
if isinstance(mission, DButils.DButils):
dbu = mission
self.mission = dbu.mission
else:
self.mission = mission
dbu = DButils.DButils(self.mission, echo=echo)
self.tempdir = None
self.runme_list = []
self.dbu = dbu
self.childrenQueue = DBqueue.DBqueue()
self.moved = DBqueue.DBqueue()
self.depends = DBqueue.DBqueue()
self.queue = DBqueue.DBqueue()
self.findChildren = DBqueue.DBqueue()
DBlogging.dblogger.debug("Entering ProcessQueue")
def __del__(self):
"""Clean up (remove temporary files and close database)"""
self.rm_tempdir()
try:
del self.dbu
except AttributeError:
pass
[docs] def set_filename(self, filename):
"""
Setter for filename, this is cleaner than just random sets
Parameters
----------
filename : :class:`str`
filename to set to :data:`filename`
"""
self.filename = filename
self.basename = os.path.basename(self.filename)
[docs] def rm_tempdir(self):
"""
remove the temp directory
"""
try:
if self.tempdir != None:
name = self.tempdir
shutil.rmtree(self.tempdir)
self.tempdir = None
DBlogging.dblogger.debug("Temp dir deleted: {0}".format(name))
except AttributeError:
pass
[docs] def mk_tempdir(self, suffix='_dbprocessing_{0}'.format(os.getpid())):
"""
create a secure temp directory
"""
self.tempdir = tempfile.mkdtemp(suffix)
[docs] def checkIncoming(self, glb='*'):
"""
Goes out to incoming and grabs all files there adding them to self.queue
Parameters
----------
glb : :class:`str`, optional
Glob pattern that files must match.
"""
DBlogging.dblogger.debug("Entered checkIncoming:")
self.queue.extendleft(self.dbu.checkIncoming(glb=glb))
# step through and remove duplicates
# if python 2.7 deque has a .count() otherwise have to use
# this workaround
for i in range(len(self.queue)):
try:
if list(self.queue).count(self.queue[i]) != 1:
self.queue.remove(self.queue[i])
except IndexError:
pass # this means it was shortened
DBlogging.dblogger.debug("Queue contains (%d): %s" % (len(self.queue),
self.queue))
[docs] def moveToError(self, fname):
"""
Moves a file from incoming to error
Parameters
----------
fname : :class:`str`
Full path to file to move to error.
"""
DBlogging.dblogger.debug("Entered moveToError: {0}".format(fname))
path = self.dbu.getErrorPath()
# if the file is a link then don;t move it to incoming just delete the link
if os.path.islink(fname):
os.unlink(fname) # Remove a file (same as remove(path)).
DBlogging.dblogger.info("moveToError file {0} was a link, so link removed not moved to error".format(fname))
else:
try:
shutil.move(fname, os.path.join(path, os.path.basename(fname)))
except IOError:
DBlogging.dblogger.error(
"file {0} was not successfully moved to error".format(os.path.join(path, os.path.basename(fname))))
else:
DBlogging.dblogger.info("moveToError {0} moved to {1}".format(fname, path))
[docs] def diskfileToDB(self, df):
"""
given a diskfile go through and do all the steps to add it into the db
Parameters
----------
df : :class:`.Diskfile`
File to add to database.
"""
if df is None:
DBlogging.dblogger.info("Found no product moving to error, {0}".format(self.basename))
if not self.dryrun:
self.moveToError(self.filename)
else:
print('<dryrun> Found no product moving to error, {0}'.format(self.basename))
return None
# create the DBfile
dbf = DBfile.DBfile(df, self.dbu)
try:
if not self.dryrun:
f_id = dbf.addFileToDB()
DBlogging.dblogger.info("File {0} entered in DB, f_id={1}".format(df.filename, f_id))
else:
print('<dryrun> File {0} entered in DB'.format(df.filename))
except (ValueError, DButils.DBError) as errmsg:
if not self.dryrun:
DBlogging.dblogger.warning("Except adding file to db so" + \
" moving to error: %s" % (errmsg))
self.moveToError(os.path.join(df.path, df.filename))
else:
print('<dryrun> Except adding file to db so' +
' moving to error: %s' % (errmsg))
return None
# move the file to the its correct home
if not self.dryrun:
dbf.move()
if not self.dryrun:
try:
self.dbu.session.commit()
except IntegrityError as IE:
self.session.rollback()
raise DButils.DBError(IE)
# add to processqueue for later processing
self.dbu.ProcessqueuePush(f_id)
return f_id
else:
return None
[docs] def importFromIncoming(self):
"""
Import a file from incoming into the database
"""
DBlogging.dblogger.debug("Entering importFromIncoming, {0} to import".format(len(self.queue)))
if not self.dryrun:
vals = self.queue.popleftiter()
else:
vals = self.queue
T0 = time.time()
for ii, val in enumerate(vals, 1):
self.set_filename(val)
DBlogging.dblogger.debug("popped '{0}' from the queue: {1} left".format(self.basename, len(self.queue)))
# see if the file is in the db, if so then don't call the inspectors
try:
id = self.dbu.getFileID(self.basename)
DBlogging.dblogger.info(
'File {0}:{1} was already in DB, not inspecting'.format(id, self.basename))
self.moveToError(self.filename)
T1 = time.time() - T0
print('{1}:{2} Removed from incoming: {0} - already present {3:.2f}s'.format(self.basename, ii, len(self.queue), T1))
T0 = time.time()
continue
except DButils.DBNoData:
DBlogging.dblogger.info('File {0} was not in DB, inspecting'.format(self.basename))
df = self.figureProduct()
if df != []:
self.diskfileToDB(df)
T1 = time.time() - T0
print('{1}:{2} Removed from incoming: {0} - ingested {3:.2f}s'.format(self.basename, ii, len(self.queue), T1))
T0 = time.time()
def _getRequiredProducts(self, process_id, file_id, utc_file_date, debug=False):
#####################################################
## get all the input products for that process, and if they are optional
T0 = time.time()
input_product_id = self.dbu.getInputProductID(process_id, True) # this is a list of tuples (id, optional, yesterday, tommorow)
if debug: print("21: {0}: self.dbu.getInputProductID: {1}".format(time.time() - T0, input_product_id))
T0 = time.time()
DBlogging.dblogger.debug(
"Finding input files for file_id:{0} process_id:{1} date:{2}".format(file_id, process_id, utc_file_date))
## here decide how we build output and do it.
timebase = self.dbu.getProcessTimebase(process_id)
if debug: print("22: {0}: self.dbu.getProcessTimebase: {1}".format(time.time() - T0, timebase))
T0 = time.time()
DBlogging.dblogger.debug("Doing {0} based processing".format(timebase))
if timebase in ['FILE', 'DAILY', 'RUN']: # taking one file to the next file
# for file based processing we are going to look to the "process_keywords" and cull the
# returned files based on making sure they are all the same
# If process_keywords is none it will fall back to current behavior (since they will all be the same)
files = []
# get all the possible files based on dates that we might want to put into the process now
for iprod_id, opt, y, t in input_product_id:
# accept a datetime.datetime or datetime.date
dt = Utils.datetimeToDate(utc_file_date)
start = dt - datetime.timedelta(days=y)
end = dt + datetime.timedelta(days=t)
kwargs = {'startTime': start, 'endTime': end} \
if timebase in ('DAILY',) \
else {'startDate': start, 'endDate': end}
tmp_files = self.dbu.getFiles(
product=iprod_id, newest_version=True, exists=True,
**kwargs)
if debug: print("23: {0}: self.dbu.getFiles, {1} {2} {3}".format(time.time() - T0, iprod_id, dt, tmp_files))
T0 = time.time()
if not tmp_files and not opt:
return None, input_product_id
else:
files.extend(tmp_files)
DBlogging.dblogger.debug("buildChildren files: ".format(str(files)))
###############
# BAL 30 March 2017, dropping this clean step as they should all be newest version per above
# remove all the files that are not the newest version, they all should be
# files = self.dbu.file_id_Clean(files)
###############
if timebase == 'FILE': # taking one file to the next file
files_out = []
# grab the process_keywords column for the file_id and all the possible other files
# they have to match in order for the file to be the same
infile_process_keywords = self.dbu.getEntry('File', file_id).process_keywords
try:
files_process_keywords = [v.process_keywords for v in files]
except AttributeError:
files_process_keywords = []
# now if the process_keywords in files_process_keywords does not match that in infile_process_keywords
# drop it
for ii, v in enumerate(files_process_keywords):
if v == infile_process_keywords:
files_out.append(files[ii])
# and give it the right name
files = files_out
else:
raise NotImplementedError('Not implemented yet: {0} based processing'.format(timebase))
raise ValueError('Bad timebase for product: {0}'.format(process_id))
return files, input_product_id
[docs] def buildChildren(self, file_id, debug=False, skip_run=False, run_procs=None):
"""
go through and all the runMe's and add to the runme_list variable
Parameters
----------
file_id : :class:`int`
file ID of the file for which children will be built
skip_run : :class:`bool`, default False
Skip RUN timebase processes if True
run_procs : :class:`str`, optional
If provided, comma-separated list of process IDs
or process names to run; other processes are
ignored. (Default: all possible processes).
"""
# if processes to run specified, turn into list of IDs
# getProcessID accepts either ID or name and returns ID
if run_procs is not None:
run_procs = [self.dbu.getProcessID(rp)
for rp in run_procs.split(',')]
T0 = time.time()
DBlogging.dblogger.debug("Entered buildChildren: file_id={0}".format(file_id))
if debug: print("Entered buildChildren: file_id={0}".format(file_id))
# if this file is not a newest_version we do not ant to run
#print("{1}: Entered buildChildren: file_id={0}".format(file_id, time.time()-T0))
T0 = time.time()
if not self.dbu.fileIsNewest(file_id[0]):
DBlogging.dblogger.debug("Was not newest version in buildChildren: file_id={0}".format(file_id))
print(" Was not newest version in buildChildren: file_id={0}".format(file_id))
return # do nothing
if debug: print(" {1}: was newest moving on in buildChildren: file_id={0}".format(file_id, time.time()-T0))
T0 = time.time()
children = self.dbu.getChildrenProcesses(file_id[0]) # returns process
if debug: print("11: {1}: done self.dbu.getChildrenProcesses buildChildren: file_id={0} : {2}".format(file_id, time.time()-T0, children))
T0 = time.time()
daterange = self.dbu.getFileDates(file_id[0]) # this is the dates that this file spans
if debug: print("12: {1}: done self.dbu.getFileDates buildChildren: file_id={0} : {2}".format(file_id, time.time()-T0, daterange))
T0 = time.time()
if debug: print("children: {0}".format(children))
for child_process in children:
# iterate over all the days between the start and stop date from above (including stop date)
for utc_file_date in Utils.expandDates(*daterange):
if debug: print(" utc_file_date: {0}".format(utc_file_date))
files, input_product_id = self._getRequiredProducts(child_process, file_id[0], utc_file_date)
if debug: print("13: {0}: self._getRequiredProducts {1} {2}".format(time.time()-T0, files, input_product_id))
T0 = time.time()
if not files:
# figure out the missing products
DBlogging.dblogger.debug("For file: {0} date: {1} required files not present {2}"
.format(file_id[0], utc_file_date, input_product_id))
continue # go on to the next file
# ==============================================================================
# do we have the required files to do the build?
# ==============================================================================
## if not self._requiredFilesPresent(files, input_product_id, process_id):
## DBlogging.dblogger.debug("For file: {0} date: {1} required files not present".format(file_id[0], utc_file_date))
## continue # go on to the next file
try:
input_files = [v.file_id for v in files]
except AttributeError:
continue
DBlogging.dblogger.debug("Input files found, {0}".format(input_files))
if skip_run \
and self.dbu.getProcessTimebase(child_process) == 'RUN':
DBlogging.dblogger.info(
"Process: {} skipping because RUN timebase"
.format(self.dbu.getEntry('Process', child_process)
.process_name))
continue
if run_procs is not None and child_process not in run_procs:
DBlogging.dblogger.info(
"Process: {} skipping because not in run-only list"
.format(self.dbu.getEntry('Process', child_process)
.process_name))
continue
runme = runMe.runMe(self.dbu, utc_file_date, child_process, input_files, self, file_id[1])
#print("{0}: runMe.runMe".format(time.time()-T0))
#T0 = time.time()
# only add to runme list if it can be run
if runme.ableToRun and (runme not in self.runme_list):
self.runme_list.append(runme)
DBlogging.dblogger.info("Filename: {0} is not in the DB, can process".format(runme.filename))
[docs] def onStartup(self):
"""
Processes can be defined as output timebase "STARTUP" which means to run
them each time to processing chain is run
"""
proc = self.dbu.getAllProcesses(timebase='STARTUP')
# TODO just going to run there here for now. This should move to runMe
for p in proc: # run them all
code = self.dbu.getEntry('Code', p.process_id)
# print code.codename
# need to call a "runner" with these processes
######
##
# not sure how to deal with having to specify a filename and handle that in the DB
# things made here will also have to have inspectors
raise NotImplementedError('Not yet implemented')
def _reprocessBy(self,
startDate=None,
endDate=None,
level=None,
product=None,
code=None,
instrument=None,
incVersion=None):
"""
Given parameters, add all files to processqueue so that next
-p run they will be reprocessed
All parameters are optional; if not specified, default is "all".
Parameters
----------
startDate : :class:`~datetime.datetime`, optional
First date to add to process queue
endDate : :class:`~datetime.datetime`, optional
Last date to add to process queue (inclusive)
level : :class:`float`, optional
Only add files of this level.
product : :class:`int`, optional
:sql:column:`~product.product_id` of files to add
code : :class:`int`, optional
Only add files created by code with ID of
:sql:column:`~code.code_id`
instrument : :class:`int`, optional
Only add files with instrument
:sql:column:`~instrument.instrument_id`
incVersion : :class:`int`, optional
Force processing and increment this version number, {0}.{1}.{2}
"""
startDate = Utils.datetimeToDate(startDate)
endDate = Utils.datetimeToDate(endDate)
f_ids = [val.file_id for val in self.dbu.getFiles(startDate=startDate,
endDate=endDate,
level=level,
product=product,
code=code,
instrument=instrument,
newest_version=True)]
return self.dbu.ProcessqueueRawadd(f_ids, incVersion)
[docs] def reprocessByCode(self, id_in, startDate=None, endDate=None, incVersion=None):
"""Add files made by a code to the queue for reprocessing.
Parameters
----------
id_in : :class:`str` or :class:`int`
ID or filename of code to reprocess
startDate : :class:`~datetime.datetime`, optional
First date to reprocess (default all)
endDate : :class:`~datetime.datetime`, optional
Last date to reprocess (default all)
incVersion : :class:`int` {0, 1, 2}, optional
Which version number to increment: major (0), minor (1),
subminor (2). Forces reprocessing. (default do not force).
"""
try:
code_id = self.dbu.getCodeID(id_in)
return self._reprocessBy(code=code_id, startDate=startDate, endDate=endDate,
incVersion=incVersion)
except DButils.DBNoData:
DBlogging.dblogger.error('No code_id {0} found in the DB'.format(id_in))
[docs] def reprocessByProduct(self, id_in, startDate=None, endDate=None, incVersion=None):
"""Add files of a particular product to the queue for reprocessing.
Parameters
----------
id_in : :class:`str` or :class:`int`
ID or name of code to reprocess
startDate : :class:`~datetime.datetime`, optional
First date to reprocess (default all)
endDate : :class:`~datetime.datetime`, optional
Last date to reprocess (default all)
incVersion : :class:`int` {0, 1, 2}, optional
Which version number to increment: major (0), minor (1),
subminor (2). Forces reprocessing (default do not force).
"""
try:
prod_id = self.dbu.getProductID(id_in)
return self._reprocessBy(product=prod_id, startDate=startDate, endDate=endDate,
incVersion=incVersion)
except DButils.DBNoData:
DBlogging.dblogger.error('No product_id {0} found in the DB'.format(id_in))
[docs] def reprocessByDate(self, startDate=None, endDate=None, incVersion=None, level=None):
"""Add files to the queue for reprocessing, by file date.
Parameters
----------
startDate : :class:`~datetime.datetime`, optional
First date to reprocess (default all)
endDate : :class:`~datetime.datetime`, optional
Last date to reprocess (default all)
incVersion : :class:`int` {0, 1, 2}, optional
Which version number to increment: major (0), minor (1),
subminor (2). Forces reprocessing (default do not force).
level : :class:`float`, optional
Only reprocess files of this level (default all)
"""
return self._reprocessBy(startDate=startDate, endDate=endDate,
incVersion=incVersion, level=level)
[docs] def reprocessByInstrument(self, id_in, level=None, startDate=None, endDate=None, incVersion=None):
"""Add files for an instrument to the queue for reprocessing
Parameters
----------
id_in : :class:`str` or :class:`int`
ID or name of instrument to reprocess
level : :class:`int`, optional
Only reprocess files of this level (default all)
startDate : :class:`~datetime.datetime`, optional
First date to reprocess (default all)
endDate : :class:`~datetime.datetime`, optional
Last date to reprocess (default all)
incVersion : :class:`int` {0, 1, 2}, optional
Which version number to increment: major (0), minor (1),
subminor (2). Forces reprocessing (default do not force).
"""
try:
inst_id = self.dbu.getInstrumentID(id_in)
return self._reprocessBy(instrument=inst_id, level=level, startDate=startDate, endDate=endDate,
incVersion=incVersion)
except DButils.DBNoData:
DBlogging.dblogger.error('No inst_id {0} found in the DB'.format(id_in))