Source code for Ska.engarchive.fetch

#!/usr/bin/env python
"""
Fetch values from the Ska engineering telemetry archive.
"""
__docformat__ = 'restructuredtext'
import sys
import os
import time
import contextlib
import cPickle as pickle
import logging
import operator
import fnmatch
import collections
import warnings

import numpy as np
import asciitable
import pyyaks.context

from . import file_defs
from .units import Units
from . import cache
from . import remote_access
from .version import __version__, __git_version__

from Chandra.Time import DateTime

# Module-level units, defaults to CXC units (e.g. Kelvins etc)
UNITS = Units(system='cxc')

# Module-level control of whether MSID.fetch will cache the last 30 results
CACHE = False

SKA = os.getenv('SKA') or '/proj/sot/ska'
ENG_ARCHIVE = os.getenv('ENG_ARCHIVE') or SKA + '/data/eng_archive'
IGNORE_COLNAMES = ('TIME', 'MJF', 'MNF', 'TLM_FMT')
DIR_PATH = os.path.dirname(os.path.abspath(__file__))

# Dates near the start of 2000 that demarcates the split between the 1999 data
# and post-2000 data.  The 1999 data goes out to at least 2000:005:13:00:00,
# while post-2000 data starts as late as 2000:001:11:58:59.  Dates between LO
# and HI get taken from either 1999 or post-2000.  The times are 4 millisec before
# a minor frame boundary to avoid collisions.
DATE2000_LO = DateTime('2000:001:00:00:00.090').date
DATE2000_HI = DateTime('2000:003:00:00:00.234').date

# Launch date (earliest possible date for telemetry)
LAUNCH_DATE = '1999:204'

# Maximum number of MSIDs that should ever match an input MSID spec
# (to prevent accidentally selecting a very large number of MSIDs)
MAX_GLOB_MATCHES = 10

# Special-case state codes that override those in the TDB
STATE_CODES = {'3TSCMOVE': [(0, 'F'), (1, 'T')],
               '3FAMOVE': [(0, 'F'), (1, 'T')],
               '3SEAID': [(0, 'SEA-A'), (1, 'SEA-B')],
               '3SEARSET': [(0, 'F'), (1, 'T')],
               '3SEAROMF': [(0, 'F'), (1, 'T')],
               '3SEAINCM': [(0, 'F'), (1, 'T')],
               '3STAB2EN': [(0, 'DISABLE'), (1, 'ENABLE')],
               '3SMOTPEN': [(0, 'ENABLE'), (1, 'DISABLE')],
               '3SMOTSEL': [(0, 'TSC'), (1, 'FA')],
               '3SHTREN': [(0, 'DISABLE'), (1, 'ENABLE')],
               '3SEARAMF': [(0, 'F'), (1, 'T')],
               }

# Cached version (by content type) of first and last available times in archive
CONTENT_TIME_RANGES = {}


def local_or_remote_function(remote_print_output):
    """
    Decorator maker so that a function gets run either locally or remotely
    depending on the state of remote_access.access_remotely.  This decorator
    maker takes an optional remote_print_output argument that will be
    be printed (locally) if the function is executed remotely,

    For functions that are decorated using this wrapper:

    Every path that may be generated locally but used remotely should be
    split with _split_path(). Conversely the functions that use
    the resultant path should re-join them with os.path.join. In the
    remote case the join will happen using the remote rules.
    """
    def the_decorator(func):
        def wrapper(*args, **kwargs):
            if remote_access.access_remotely:
                # If accessing a remote archive, establish the connection (if
                # necessary)
                if not remote_access.connection_is_established():
                    try:
                        if not remote_access.establish_connection():
                            raise remote_access.RemoteConnectionError(
                                "Unable to establish connection for remote fetch.")
                    except EOFError:
                        # An EOF error can be raised if the python interpreter is being
                        # called in such a way that input cannot be received from the
                        # user (e.g. when the python interpreter is called from MATLAB)
                        # If that is the case (and remote access is enabled), then
                        # raise an import error
                        raise ImportError("Unable to interactively get remote access "
                                          "info from user.")
                # Print the output, if specified
                if remote_access.show_print_output and not remote_print_output is None:
                    print remote_print_output
                    sys.stdout.flush()
                # Execute the function remotely and return the result
                return remote_access.execute_remotely(func, *args, **kwargs)
            else:
                return func(*args, **kwargs)
        return wrapper
    return the_decorator


def _split_path(path):
    """
    Return a tuple of the components for ``path``. Strip off the drive if
    it exists. This works correctly for the local OS (linux / windows).
    """
    drive, path = os.path.splitdrive(path)
    folders = []
    while True:
        path, folder = os.path.split(path)

        if folder != "":
            folders.append(folder)
        else:
            if path == "\\":
                folders.append("/")
            elif path != "":
                folders.append(path)
            break

    folders.reverse()
    return folders

# Context dictionary to provide context for msid_files
ft = pyyaks.context.ContextDict('ft')

# Global (eng_archive) definition of file names
msid_files = pyyaks.context.ContextDict('msid_files', basedir=ENG_ARCHIVE)
msid_files.update(file_defs.msid_files)

# Module-level values defining available content types and column (MSID) names
filetypes = asciitable.read(os.path.join(DIR_PATH, 'filetypes.dat'))
content = collections.OrderedDict()

# Get the list of filenames (an array is built to pass all the filenames at
# once to the remote machine since passing them one at a time is rather slow)
all_msid_names_files = dict()
for filetype in filetypes:
    ft['content'] = filetype['content'].lower()
    all_msid_names_files[str(ft['content'])] = \
        _split_path(msid_files['colnames'].abs)


# Function to load MSID names from the files (executed remotely, if necessary)
@local_or_remote_function("Loading MSID names from Ska eng archive server...")
def load_msid_names(all_msid_names_files):
    import cPickle as pickle
    all_colnames = dict()
    for k, msid_names_file in all_msid_names_files.iteritems():
        try:
            all_colnames[k] = pickle.load(open(os.path.join(*msid_names_file)))
        except IOError:
            pass
    return all_colnames
# Load the MSID names
all_colnames = load_msid_names(all_msid_names_files)

# Save the names
for k, colnames in all_colnames.iteritems():
    content.update((x, k) for x in sorted(colnames)
                   if x not in IGNORE_COLNAMES)

# Cache of the most-recently used TIME array and associated bad values mask.
# The key is (content_type, tstart, tstop).
times_cache = dict(key=None)


# Set up logging.
class NullHandler(logging.Handler):
    def emit(self, record):
        pass

logger = logging.getLogger('Ska.engarchive.fetch')
logger.addHandler(NullHandler())
logger.propagate = False

# Warn the user if ENG_ARCHIVE is set such that the data path is non-standard
if os.getenv('ENG_ARCHIVE'):
    print('fetch: using ENG_ARCHIVE={} for archive path'
          .format(os.getenv('ENG_ARCHIVE')))


[docs]def get_units(): """Get the unit system currently being used for conversions. """ return UNITS['system']
[docs]def set_units(unit_system): """Set the unit system used for output telemetry values. The default is "cxc". Allowed values for ``unit_system`` are: ==== ============================================================== cxc FITS standard units used in CXC archive files (basically MKS) sci Same as "cxc" but with temperatures in degC instead of Kelvins eng OCC engineering units (TDB P009, e.g. degF, ft-lb-sec, PSI) ==== ============================================================== :param unit_system: system of units (cxc, sci, eng) """ UNITS.set_units(unit_system)
[docs]def read_bad_times(table): """Include a list of bad times from ``table`` in the fetch module ``bad_times`` registry. This routine can be called multiple times with different tables and the bad times will be appended to the registry. The table can include any number of bad time interval specifications, one per line. A bad time interval line has three columns separated by whitespace, e.g.:: aogbias1 2008:292:00:00:00 2008:297:00:00:00 The MSID name is not case sensitive and the time values can be in any ``DateTime`` format. Blank lines and any line starting with the # character are ignored. """ bad_times = asciitable.read(table, Reader=asciitable.NoHeader, names=['msid', 'start', 'stop']) for msid, start, stop in bad_times: msid_bad_times.setdefault(msid.upper(), []).append((start, stop)) # Set up bad times dict
msid_bad_times = dict() read_bad_times(os.path.join(DIR_PATH, 'msid_bad_times.dat'))
[docs]def msid_glob(msid): """Get the archive MSIDs matching ``msid``. The function returns a tuple of (msids, MSIDs) where ``msids`` is a list of MSIDs that is all lower case and (where possible) matches the input ``msid``. The output ``MSIDs`` is all upper case and corresponds to the exact MSID names stored in the archive HDF5 files. :param msid: input MSID glob :returns: tuple (msids, MSIDs) """ MSID = msid.upper() # First try MSID or DP_<MSID>. If success then return the upper # case version and whatever the user supplied (could be any case). for match in (MSID, 'DP_' + MSID): if match in content: return [msid], [match] # Next try as a file glob. If there is a match then return a # list of matches, all lower case and all upper case. Since the # input was a glob the returned msids are just lower case versions # of the matched upper case MSIDs. for match in (MSID, 'DP_' + MSID): matches = fnmatch.filter(content.keys(), match) if matches: if len(matches) > MAX_GLOB_MATCHES: raise ValueError( 'MSID spec {} matches more than {} MSIDs. ' 'Refine the spec or increase fetch.MAX_GLOB_MATCHES' .format(msid, MAX_GLOB_MATCHES)) return [x.lower() for x in matches], matches raise ValueError('MSID {} is not in Eng Archive'.format(MSID))
def _get_table_intervals_as_list(table, check_overlaps=True): """ Determine if the input ``table`` looks like a table of intervals. This can either be a structured array / Table with datestart / datestop or tstart / tstop columns, OR a list of lists. If so, return a list of corresponding start/stop tuples, otherwise return None. If ``check_overlaps`` is True then a check is made to assure that the supplied intervals do not overlap. This is needed when reading multiple intervals with a single call to fetch, but not for bad times filtering. """ intervals = None if isinstance(table, (list, tuple)): try: intervals = [(DateTime(row[0]).secs, DateTime(row[1]).secs) for row in table] except: pass else: for prefix in ('date', 't'): start = prefix + 'start' stop = prefix + 'stop' try: intervals = [(DateTime(row[start]).secs, DateTime(row[stop]).secs) for row in table] except: pass else: break # Got an intervals list, now sort if check_overlaps and intervals is not None: from itertools import izip intervals = sorted(intervals, key=lambda x: x[0]) # Check for overlaps if any(i0[1] > i1[0] for i0, i1 in izip(intervals[:-1], intervals[1:])): raise ValueError('Input intervals overlap') return intervals
[docs]class MSID(object): """Fetch data from the engineering telemetry archive into an MSID object. The input ``msid`` is case-insensitive and can include linux file "glob" patterns, for instance ``orb*1*_x`` (ORBITEPHEM1_X) or ``*pcadmd`` (AOPCADMD). For derived parameters the initial ``DP_`` is optional, for instance ``dpa_pow*`` (DP_DPA_POWER). :param msid: name of MSID (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :returns: MSID instance """ units = UNITS fetch = sys.modules[__name__] def __init__(self, msid, start=LAUNCH_DATE, stop=None, filter_bad=False, stat=None): msids, MSIDs = msid_glob(msid) if len(MSIDs) > 1: raise ValueError('Multiple matches for {} in Eng Archive' .format(msid)) else: self.msid = msids[0] self.MSID = MSIDs[0] # Capture the current module units self.units = Units(self.units['system']) self.unit = self.units.get_msid_unit(self.MSID) self.stat = stat if stat: self.dt = {'5min': 328, 'daily': 86400}[stat] # If ``start`` is actually a table of intervals then fetch # each interval separately and concatenate the results intervals = _get_table_intervals_as_list(start, check_overlaps=True) if intervals is not None: start, stop = intervals[0][0], intervals[-1][1] self.tstart = DateTime(start).secs self.tstop = (DateTime(stop).secs if stop else DateTime(time.time(), format='unix').secs) self.datestart = DateTime(self.tstart).date self.datestop = DateTime(self.tstop).date try: self.content = content[self.MSID] except KeyError: raise ValueError('MSID %s is not in Eng Archive' % self.MSID) if self.datestart < DATE2000_LO and self.datestop > DATE2000_HI: intervals = [(self.datestart, DATE2000_HI), (DATE2000_HI, self.datestop)] # Get the times, values, bad values mask from the HDF5 files archive if intervals is None: self._get_data() else: self._get_data_over_intervals(intervals) # If requested filter out bad values and set self.bad = None if filter_bad: self.filter_bad() def _get_data_over_intervals(self, intervals): """ Fetch intervals separately and concatenate the results. """ msids = [] for start, stop in intervals: msids.append(self.fetch.MSID(self.msid, start, stop, filter_bad=False, stat=self.stat)) # No bad values column for stat='5min' or 'daily', but still need this attribute. if self.stat: self.bads = None self.colnames = msids[0].colnames for attr in self.colnames: vals = np.concatenate([getattr(msid, attr) for msid in msids]) setattr(self, attr, vals) def _get_data(self): """Get data from the Eng archive""" logger.info('Getting data for %s between %s to %s', self.msid, self.datestart, self.datestop) # Avoid stomping on caller's filetype 'ft' values with _cache_ft() with _cache_ft(): ft['content'] = self.content ft['msid'] = self.MSID with _set_msid_files_basedir(self.datestart): if self.stat: ft['interval'] = self.stat self._get_stat_data() else: gmd = (self._get_msid_data_cached if CACHE else self._get_msid_data) self.vals, self.times, self.bads, self.colnames = \ gmd(self.content, self.tstart, self.tstop, self.MSID, self.units['system']) def _get_stat_data(self): """Do the actual work of getting stats values for an MSID from HDF5 files""" filename = msid_files['stats'].abs logger.info('Opening %s', filename) @local_or_remote_function("Getting stat data for " + self.MSID + " from Ska eng archive server...") def get_stat_data_from_server(filename, dt, tstart, tstop): import tables h5 = tables.openFile(os.path.join(*filename)) table = h5.root.data times = (table.col('index') + 0.5) * dt row0, row1 = np.searchsorted(times, [tstart, tstop]) table_rows = table[row0:row1] # returns np.ndarray (structured array) h5.close() return (times[row0:row1], table_rows, row0, row1) times, table_rows, row0, row1 = \ get_stat_data_from_server(_split_path(filename), self.dt, self.tstart, self.tstop) logger.info('Closed %s', filename) self.bads = None self.times = times self.colnames = ['times'] for colname in table_rows.dtype.names: # Don't like the way columns were named in the stats tables. # Fix that here. colname_out = _plural(colname) if colname != 'n' else 'samples' if colname_out in ('vals', 'mins', 'maxes', 'means', 'p01s', 'p05s', 'p16s', 'p50s', 'p84s', 'p95s', 'p99s'): vals = self.units.convert(self.MSID, table_rows[colname]) elif colname_out == 'stds': vals = self.units.convert(self.MSID, table_rows[colname], delta_val=True) else: vals = table_rows[colname] setattr(self, colname_out, vals) self.colnames.append(colname_out) # Redefine the 'vals' attribute to be 'means' if it exists. This is a # more consistent use of the 'vals' attribute and there is little use # for the original sampled version. if hasattr(self, 'means'): # Create new attribute midvals and add as a column (fixes kadi#17) self.colnames.append('midvals') self.midvals = self.vals self.vals = self.means @staticmethod @cache.lru_cache(30) def _get_msid_data_cached(content, tstart, tstop, msid, unit_system): """Do the actual work of getting time and values for an MSID from HDF5 files and cache recent results. Caching is very beneficial for derived parameter updates but not desirable for normal fetch usage.""" return MSID._get_msid_data(content, tstart, tstop, msid, unit_system) @staticmethod def _get_msid_data(content, tstart, tstop, msid, unit_system): """Do the actual work of getting time and values for an MSID from HDF5 files""" # Get a row slice into HDF5 file for this content type that picks out # the required time range plus a little padding on each end. h5_slice = get_interval(content, tstart, tstop) # Read the TIME values either from cache or from disk. if times_cache['key'] == (content, tstart, tstop): logger.info('Using times_cache for %s %s to %s', content, tstart, tstop) times = times_cache['val'] # Already filtered on times_ok times_ok = times_cache['ok'] # For filtering MSID.val and MSID.bad times_all_ok = times_cache['all_ok'] else: ft['msid'] = 'time' filename = msid_files['msid'].abs logger.info('Reading %s', filename) @local_or_remote_function("Getting time data from Ska eng archive server...") def get_time_data_from_server(h5_slice, filename): import tables h5 = tables.openFile(os.path.join(*filename)) times_ok = ~h5.root.quality[h5_slice] times = h5.root.data[h5_slice] h5.close() return(times_ok, times) times_ok, times = get_time_data_from_server(h5_slice, _split_path(filename)) # Filter bad times. Last instance of bad times in archive is 2004 # so don't do this unless needed. Creating a new 'times' array is # much more expensive than checking for np.all(times_ok). times_all_ok = np.all(times_ok) if not times_all_ok: times = times[times_ok] times_cache.update(dict(key=(content, tstart, tstop), val=times, ok=times_ok, all_ok=times_all_ok)) # Extract the actual MSID values and bad values mask ft['msid'] = msid filename = msid_files['msid'].abs logger.info('Reading %s', filename) @local_or_remote_function("Getting msid data for " + msid + " from Ska eng archive server...") def get_msid_data_from_server(h5_slice, filename): import tables h5 = tables.openFile(os.path.join(*filename)) vals = h5.root.data[h5_slice] bads = h5.root.quality[h5_slice] h5.close() return(vals, bads) vals, bads = get_msid_data_from_server(h5_slice, _split_path(filename)) # Filter bad times rows if needed if not times_all_ok: logger.info('Filtering bad times values for %s', msid) bads = bads[times_ok] vals = vals[times_ok] # Slice down to exact requested time range row0, row1 = np.searchsorted(times, [tstart, tstop]) logger.info('Slicing %s arrays [%d:%d]', msid, row0, row1) vals = Units(unit_system).convert(msid.upper(), vals[row0:row1]) times = times[row0:row1] bads = bads[row0:row1] colnames = ['times', 'vals', 'bads'] return (vals, times, bads, colnames) @property
[docs] def state_codes(self): """List of state codes tuples (raw_count, state_code) for state-valued MSIDs """ if 'S' not in self.vals.dtype.str: self._state_codes = None if self.MSID in STATE_CODES: self._state_codes = STATE_CODES[self.MSID] if not hasattr(self, '_state_codes'): import Ska.tdb states = Ska.tdb.msids[self.MSID].Tsc if states is None or len(set(states['CALIBRATION_SET_NUM'])) != 1: warnings.warn('MSID {} has string vals but no state codes ' 'or multiple calibration sets'.format(self.msid)) self._state_codes = None else: states = np.sort(states.data, order='LOW_RAW_COUNT') self._state_codes = [(state['LOW_RAW_COUNT'], state['STATE_CODE']) for state in states] return self._state_codes
@property
[docs] def raw_vals(self): """Raw counts corresponding to the string state-code values that are stored in ``self.vals`` """ # If this is not a string-type value then there are no raw values if 'S' not in self.vals.dtype.str or self.state_codes is None: self._raw_vals = None if not hasattr(self, '_raw_vals'): self._raw_vals = np.zeros(len(self.vals), dtype='int8') - 1 # CXC state code telem all has same length with trailing spaces # so find max length for formatting below. max_len = max(len(x[1]) for x in self.state_codes) fmtstr = '{:' + str(max_len) + 's}' for raw_val, state_code in self.state_codes: ok = self.vals == fmtstr.format(state_code) self._raw_vals[ok] = raw_val return self._raw_vals
@property
[docs] def tdb(self): """Access the Telemetry database entries for this MSID """ import Ska.tdb return Ska.tdb.msids[self.MSID]
[docs] def interpolate(self, dt=None, start=None, stop=None, times=None): """Perform nearest-neighbor interpolation of the MSID to the specified time sequence. The time sequence steps uniformly by ``dt`` seconds starting at the ``start`` time and ending at the ``stop`` time. If not provided the times default to the first and last times for the MSID. The MSID ``times`` attribute is set to the common time sequence. In addition a new attribute ``times0`` is defined that stores the nearest neighbor interpolated time, providing the *original* timestamps of each new interpolated value for that MSID. If ``times`` is provided then this gets used instead of the default linear progression from ``start`` and ``dt``. :param dt: time step (sec, default=328.0) :param start: start of interpolation period (DateTime format) :param stop: end of interpolation period (DateTime format) :param times: array of times for interpolation (default=None) """ import Ska.Numpy if times is not None: if any(kwarg is not None for kwarg in (dt, start, stop)): raise ValueError('If "times" keyword is set then "dt", "start", ' 'and "stop" cannot be set') # Use user-supplied times that are within the range of telemetry. ok = (times >= self.times[0]) & (times <= self.times[-1]) times = times[ok] else: dt = 328.0 if dt is None else dt tstart = DateTime(start).secs if start else self.times[0] tstop = DateTime(stop).secs if stop else self.times[-1] # Legacy method for backward compatibility. Note that the np.arange() # call accrues floating point error. tstart = max(tstart, self.times[0]) tstop = min(tstop, self.times[-1]) times = np.arange(tstart, tstop, dt) logger.info('Interpolating index for %s', self.msid) indexes = Ska.Numpy.interpolate(np.arange(len(self.times)), self.times, times, method='nearest', sorted=True) logger.info('Slicing on indexes') for colname in self.colnames: colvals = getattr(self, colname) if colvals is not None: setattr(self, colname, colvals[indexes]) # Make a new attribute times0 that stores the nearest neighbor # interpolated times. Then set the MSID times to be the common # interpolation times. self.times0 = self.times self.times = times
def copy(self): from copy import deepcopy return deepcopy(self)
[docs] def filter_bad(self, bads=None, copy=False): """Filter out any bad values. After applying this method the "bads" column will be set to None to indicate that there are no bad values. :param bads: Bad values mask. If not supplied then self.bads is used. :param copy: return a copy of MSID object with bad values filtered """ obj = self.copy() if copy else self # If bad mask is provided then override any existing bad mask for MSID if bads is not None: obj.bads = bads # Nothing to do if bads is None (i.e. bad values already filtered) if obj.bads is None: return if np.any(obj.bads): logger.info('Filtering bad values for %s', obj.msid) ok = ~obj.bads colnames = (x for x in obj.colnames if x != 'bads') for colname in colnames: setattr(obj, colname, getattr(obj, colname)[ok]) obj.bads = None if copy: return obj
[docs] def filter_bad_times(self, start=None, stop=None, table=None, copy=False): """Filter out intervals of bad data in the MSID object. There are three usage options: - Supply no arguments. This will use the global list of bad times read in with fetch.read_bad_times(). - Supply both ``start`` and ``stop`` values where each is a single value in a valid DateTime format. - Supply an ``table`` parameter in the form of a 2-column table of start and stop dates (space-delimited) or the name of a file with data in the same format. The ``table`` parameter must be supplied as a table or the name of a table file, for example:: bad_times = ['2008:292:00:00:00 2008:297:00:00:00', '2008:305:00:12:00 2008:305:00:12:03', '2010:101:00:01:12 2010:101:00:01:25'] msid.filter_bad_times(table=bad_times) msid.filter_bad_times(table='msid_bad_times.dat') :param start: Start of time interval to exclude (any DateTime format) :param stop: End of time interval to exclude (any DateTime format) :param table: Two-column table (start, stop) of bad time intervals :param copy: return a copy of MSID object with bad times filtered """ if table is not None: bad_times = asciitable.read(table, Reader=asciitable.NoHeader, names=['start', 'stop']) elif start is None and stop is None: bad_times = [] for msid_glob, times in msid_bad_times.items(): if fnmatch.fnmatch(self.MSID, msid_glob): bad_times.extend(times) elif start is None or stop is None: raise ValueError('filter_times requires either 2 args ' '(start, stop) or no args') else: bad_times = [(start, stop)] obj = self.copy() if copy else self obj._filter_times(bad_times, exclude=True) if copy: return obj
[docs] def remove_intervals(self, intervals, copy=False): """ Remove telemetry points that occur within the specified ``intervals`` This method is the converse of select_intervals(). The ``intervals`` argument can be either a list of (start, stop) tuples or an EventQuery object from kadi. If ``copy`` is set to True then a copy of the MSID object is made prior to removing intervals, and that copy is returned. The default is to remove intervals in place. This example shows fetching the pitch component of the spacecraft rate. After examining the rates, the samples during maneuvers are then removed and the standard deviation is recomputed. This filters out the large rates during maneuvers:: >>> aorate2 = fetch.Msid('aorate2', '2011:001', '2011:002') >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec 3.9969393528801782 >>> figure(1) >>> aorate2.plot(',') >>> from kadi import events >>> aorate2.remove_intervals(events.manvrs) >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec -0.0003688639491030978 >>> figure(2) >>> aorate2.plot(',') :param intervals: EventQuery or iterable (N x 2) with start, stop dates/times :param copy: return a copy of MSID object with intervals removed """ obj = self.copy() if copy else self obj._filter_times(intervals, exclude=True) if copy: return obj
[docs] def select_intervals(self, intervals, copy=False): """ Select telemetry points that occur within the specified ``intervals`` This method is the converse of remove_intervals(). The ``intervals`` argument can be either a list of (start, stop) tuples or an EventQuery object from kadi. If ``copy`` is set to True then a copy of the MSID object is made prior to selecting intervals, and that copy is returned. The default is to selecte intervals in place. This example shows fetching the pitch component of the spacecraft rate. After examining the rates, the samples during maneuvers are then selected and the mean is recomputed. This highlights the large rates during maneuvers:: >>> aorate2 = fetch.Msid('aorate2', '2011:001', '2011:002') >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec 3.9969393528801782 >>> figure(1) >>> aorate2.plot(',') >>> from kadi import events >>> aorate2.select_intervals(events.manvrs) >>> aorate2.vals.mean() * 3600 * 180 / np.pi # rate in arcsec/sec 24.764309542605481 >>> figure(2) >>> aorate2.plot(',') :param intervals: EventQuery or iterable (N x 2) with start, stop dates/times :param copy: return a copy of MSID object with intervals selected """ obj = self.copy() if copy else self obj._filter_times(intervals, exclude=False) if copy: return obj
def _filter_times(self, intervals, exclude=True): """ Filter the times of self based on ``intervals``. :param intervals: iterable (N x 2) with tstart, tstop in seconds :param exclude: exclude intervals if True, else include intervals """ # Make an initial acceptance mask. If exclude is True then initially # all values are allowed (ok=True). If exclude is False (i.e. only # include the interval times) then ok=False everywhere. ok = np.empty(len(self.times), dtype=bool) ok[:] = exclude # See if the input intervals is actually a table of intervals intervals_list = _get_table_intervals_as_list(intervals, check_overlaps=False) if intervals_list is not None: intervals = intervals_list # Check if this is an EventQuery. Would rather not import EventQuery # because this is expensive (django), so just look at the names in # object MRO. if 'EventQuery' in (cls.__name__ for cls in intervals.__class__.__mro__): intervals = intervals.intervals(self.datestart, self.datestop) intervals = [(DateTime(start).secs, DateTime(stop).secs) for start, stop in intervals] for tstart, tstop in intervals: if tstart > tstop: raise ValueError("Start time %s must be less than stop time %s" % (tstart, tstop)) if tstop < self.times[0] or tstart > self.times[-1]: continue # Find the indexes of bad data. Using side=left,right respectively # will exclude points exactly equal to the bad_times values # (though in reality an exact tie is extremely unlikely). i0 = np.searchsorted(self.times, tstart, side='left') i1 = np.searchsorted(self.times, tstop, side='right') ok[i0:i1] = not exclude colnames = (x for x in self.colnames) for colname in colnames: attr = getattr(self, colname) if isinstance(attr, np.ndarray): setattr(self, colname, attr[ok])
[docs] def write_zip(self, filename, append=False): """Write MSID to a zip file named ``filename`` Within the zip archive the data for this MSID will be stored in csv format with the name <msid_name>.csv. :param filename: output zipfile name :param append: append to an existing zipfile """ import zipfile from itertools import izip colnames = self.colnames[:] if self.bads is None and 'bads' in colnames: colnames.remove('bads') if self.state_codes: colnames.append('raw_vals') # Indexes value is not interesting for output if 'indexes' in colnames: colnames.remove('indexes') colvals = tuple(getattr(self, x) for x in colnames) fmt = ",".join("%s" for x in colnames) f = zipfile.ZipFile(filename, ('a' if append and os.path.exists(filename) else 'w')) info = zipfile.ZipInfo(self.msid + '.csv') info.external_attr = 0664 << 16L # Set permissions info.date_time = time.localtime()[:7] info.compress_type = zipfile.ZIP_DEFLATED f.writestr(info, ",".join(colnames) + '\n' + '\n'.join(fmt % x for x in izip(*colvals)) + '\n') f.close()
[docs] def logical_intervals(self, op, val, complete_intervals=True, max_gap=None): """Determine contiguous intervals during which the logical comparison expression "MSID.vals op val" is True. Allowed values for ``op`` are:: == != > < >= <= If ``complete_intervals`` is True (default) then the intervals are guaranteed to be complete so that the all reported intervals had a transition before and after within the telemetry interval. If ``max_gap`` is specified then any time gaps longer than ``max_gap`` are filled with a fictitious False value to create an artificial interval boundary at ``max_gap / 2`` seconds from the nearest data value. Returns a structured array table with a row for each interval. Columns are: * datestart: date of interval start * datestop: date of interval stop * duration: duration of interval (sec) * tstart: time of interval start (CXC sec) * tstop: time of interval stop (CXC sec) Examples:: >>> dat = fetch.MSID('aomanend', '2010:001', '2010:005') >>> manvs = dat.logical_intervals('==', 'NEND') >>> dat = fetch.MSID('61PSTS02', '1999:200', '2000:001') >>> safe_suns = dat.logical_intervals('==', 'SSM', complete_intervals=False, max_gap=66) :param op: logical operator, one of == != > < >= <= :param val: comparison value :param complete_intervals: return only complete intervals (default=True) :param max_gap: max allowed gap between time stamps (sec, default=None) :returns: structured array table of intervals """ from . import utils ops = {'==': operator.eq, '!=': operator.ne, '>': operator.gt, '<': operator.lt, '>=': operator.ge, '<=': operator.le} try: op = ops[op] except KeyError: raise ValueError('op = "{}" is not in allowed values: {}' .format(op, sorted(ops.keys()))) # Do local version of bad value filtering if self.bads is not None and np.any(self.bads): ok = ~self.bads vals = self.vals[ok] times = self.times[ok] else: vals = self.vals times = self.times bools = op(vals, val) return utils.logical_intervals(times, bools, complete_intervals, max_gap)
[docs] def state_intervals(self): """Determine contiguous intervals during which the MSID value is unchanged. Returns a structured array table with a row for each interval. Columns are: * datestart: date of interval start * datestop: date of interval stop * duration: duration of interval (sec) * tstart: time of interval start (CXC sec) * tstop: time of interval stop (CXC sec) * val: MSID value during the interval Example:: dat = fetch.MSID('cobsrqid', '2010:001', '2010:005') obsids = dat.state_intervals() :param val: state value for which intervals are returned. :returns: structured array table of intervals """ from . import utils # Do local version of bad value filtering if self.bads is not None and np.any(self.bads): ok = ~self.bads vals = self.vals[ok] times = self.times[ok] else: vals = self.vals times = self.times if len(self.vals) < 2: raise ValueError('Filtered data length must be at least 2') return utils.state_intervals(times, vals)
[docs] def iplot(self, fmt='-b', fmt_minmax='-c', **plot_kwargs): """Make an interactive plot for exploring the MSID data. This method opens a new plot figure (or clears the current figure) and plots the MSID ``vals`` versus ``times``. This plot can be panned or zoomed arbitrarily and the data values will be fetched from the archive as needed. Depending on the time scale, ``iplot`` displays either full resolution, 5-minute, or daily values. For 5-minute and daily values the min and max values are also plotted. Once the plot is displayed and the window is selected by clicking in it, the following key commands are recognized:: a: autoscale for full data range in x and y m: toggle plotting of min/max values p: pan at cursor x y: toggle autoscaling of y-axis z: zoom at cursor x ?: print help Example:: dat = fetch.Msid('aoattqt1', '2011:001', '2012:001', stat='5min') dat.iplot() dat.iplot('.b', '.c', markersize=0.5) Caveat: the ``iplot()`` method is not meant for use within scripts, and may give unexpected results if used in combination with other plotting commands directed at the same plot figure. :param fmt: plot format for values (default="-b") :param fmt_minmax: plot format for mins and maxes (default="-c") :param plot_kwargs: additional plotting keyword args """ from .plot import MsidPlot self._iplot = MsidPlot(self, fmt, fmt_minmax, **plot_kwargs)
[docs] def plot(self, *args, **kwargs): """Plot the MSID ``vals`` using Ska.Matplotlib.plot_cxctime() This is a convenience function for plotting the MSID values. It is equivalent to:: plot_cxctime(self.times, self.vals, *args, **kwargs) where ``*args`` are additional arguments and ``**kwargs`` are additional keyword arguments that are accepted by ``plot_cxctime()``. Example:: dat = fetch.Msid('tephin', '2011:001', '2012:001', stat='5min') dat.plot('-r', linewidth=2) """ import matplotlib.pyplot as plt from Ska.Matplotlib import plot_cxctime vals = self.raw_vals if self.state_codes else self.vals plot_cxctime(self.times, vals, *args, state_codes=self.state_codes, **kwargs) plt.margins(0.02, 0.05)
def __len__(self): return len(self.times)
[docs]class MSIDset(collections.OrderedDict): """Fetch a set of MSIDs from the engineering telemetry archive. Each input ``msid`` is case-insensitive and can include linux file "glob" patterns, for instance ``orb*1*_?`` (ORBITEPHEM1_X, Y and Z) or ``aoattqt[1234]`` (AOATTQT1, 2, 3, and 4). For derived parameters the initial ``DP_`` is optional, for instance ``dpa_pow*`` (DP_DPA_POWER). :param msids: list of MSID names (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :returns: Dict-like object containing MSID instances keyed by MSID name """ MSID = MSID def __init__(self, msids, start=LAUNCH_DATE, stop=None, filter_bad=False, stat=None): super(MSIDset, self).__init__() intervals = _get_table_intervals_as_list(start, check_overlaps=True) if intervals is not None: start, stop = intervals[0][0], intervals[-1][1] self.tstart = DateTime(start).secs self.tstop = (DateTime(stop).secs if stop else DateTime().secs) self.datestart = DateTime(self.tstart).date self.datestop = DateTime(self.tstop).date # Input ``msids`` may contain globs, so expand each and add to new list new_msids = [] for msid in msids: new_msids.extend(msid_glob(msid)[0]) for msid in new_msids: if intervals is None: self[msid] = self.MSID(msid, self.tstart, self.tstop, filter_bad=False, stat=stat) else: self[msid] = self.MSID(msid, intervals, filter_bad=False, stat=stat) if filter_bad: self.filter_bad() def __deepcopy__(self, memo=None): out = self.__class__([], None) for attr in ('tstart', 'tstop', 'datestart', 'datestop'): setattr(out, attr, getattr(self, attr)) for msid in self: out[msid] = self[msid].copy() return out def copy(self): return self.__deepcopy__()
[docs] def filter_bad(self, copy=False): """Filter bad values for the MSID set. This function applies the union (logical-OR) of bad value masks for all MSIDs in the set with the same content type. The result is that the filtered MSID samples are valid for *all* MSIDs within the content type and the arrays all match up. For example:: msids = fetch.MSIDset(['aorate1', 'aorate2', 'aogyrct1', 'aogyrct2'], '2009:001', '2009:002') msids.filter_bad() Since ``aorate1`` and ``aorate2`` both have content type of ``pcad3eng`` they will be filtered as a group and will remain with the same sampling. This will allow something like:: plot(msids['aorate1'].vals, msids['aorate2'].vals) Likewise the two gyro count MSIDs would be filtered as a group. If this group-filtering is not the desired behavior one can always call the individual MSID.filter_bad() function for each MSID in the set:: for msid in msids.values(): msid.filter_bad() :param copy: return a copy of MSID object with intervals selected """ obj = self.copy() if copy else self for content in set(x.content for x in obj.values()): bads = None msids = [x for x in obj.values() if x.content == content and x.bads is not None] for msid in msids: if bads is None: bads = msid.bads.copy() else: bads |= msid.bads for msid in msids: msid.filter_bad(bads) if copy: return obj
[docs] def filter_bad_times(self, start=None, stop=None, table=None, copy=False): """Filter out intervals of bad data in the MSIDset object. There are three usage options: - Supply no arguments. This will use the global list of bad times read in with fetch.read_bad_times(). - Supply both ``start`` and ``stop`` values where each is a single value in a valid DateTime format. - Supply an ``table`` parameter in the form of a 2-column table of start and stop dates (space-delimited) or the name of a file with data in the same format. The ``table`` parameter must be supplied as a table or the name of a table file, for example:: msidset.filter_bad_times() bad_times = ['2008:292:00:00:00 2008:297:00:00:00', '2008:305:00:12:00 2008:305:00:12:03', '2010:101:00:01:12 2010:101:00:01:25'] msidset.filter_bad_times(table=bad_times) msidset.filter_bad_times(table='msid_bad_times.dat') :param start: Start of time interval to exclude (any DateTime format) :param stop: End of time interval to exclude (any DateTime format) :param table: Two-column table (start, stop) of bad time intervals :param copy: return a copy of MSID object with intervals selected """ obj = self.copy() if copy else self for msid in obj.values(): msid.filter_bad_times(start, stop, table) if copy: return obj
[docs] def interpolate(self, dt=None, start=None, stop=None, filter_bad=True, times=None, bad_union=False, copy=False): """ Perform nearest-neighbor interpolation of all MSID values in the set to a common time sequence. The values are updated in-place. **Times** The time sequence steps uniformly by ``dt`` seconds starting at the ``start`` time and ending at the ``stop`` time. If not provided the times default to the ``start`` and ``stop`` times for the MSID set. If ``times`` is provided then this gets used instead of the default linear progression from ``start`` and ``dt``. For each MSID in the set the ``times`` attribute is set to the common time sequence. In addition a new attribute ``times0`` is defined that stores the nearest neighbor interpolated time, providing the *original* timestamps of each new interpolated value for that MSID. **Filtering and bad values** If ``filter_bad`` is True (default) then bad values are filtered from the interpolated MSID set. There are two strategies for doing this: 1) ``bad_union = False`` Remove the bad values in each MSID prior to interpolating the set to a common time series. This essentially says to use all the available data individually. Each MSID has bad data filtered individually *before* interpolation so that the nearest neighbor interpolation only finds good data. This strategy is done when ``filter_union = False``, which is the default setting. 2) ``bad_union = True`` Mark every MSID in the set as bad at the interpolated time if *any* of them are bad at that time. This stricter version is required when it is important that the MSIDs be truly correlated in time. For instance this is needed for attitude quaternions since all four values must be from the exact same telemetry sample. If you are not sure, this is the safer option. :param dt: time step (sec, default=328.0) :param start: start of interpolation period (DateTime format) :param stop: end of interpolation period (DateTime format) :param filter_bad: filter bad values :param times: array of times for interpolation (default=None) :param bad_union: filter union of bad values after interpolating :param copy: return a new copy instead of in-place update (default=False) """ import Ska.Numpy obj = self.copy() if copy else self msids = obj.values() # MSID objects in the MSIDset # Ensure that tstart / tstop is entirely within the range of available # data fetched from the archive. max_fetch_tstart = max(msid.times[0] for msid in msids) min_fetch_tstop = min(msid.times[-1] for msid in msids) if times is not None: if any(kwarg is not None for kwarg in (dt, start, stop)): raise ValueError('If "times" keyword is set then "dt", "start", ' 'and "stop" cannot be set') # Use user-supplied times that are within the range of telemetry. ok = (times >= max_fetch_tstart) & (times <= min_fetch_tstop) obj.times = times[ok] else: # Get the nominal tstart / tstop range dt = 328.0 if dt is None else dt tstart = DateTime(start).secs if start else obj.tstart tstop = DateTime(stop).secs if stop else obj.tstop tstart = max(tstart, max_fetch_tstart) tstop = min(tstop, min_fetch_tstop) obj.times = np.arange((tstop - tstart) // dt + 1) * dt + tstart for msid in msids: if filter_bad and not bad_union: msid.filter_bad() logger.info('Interpolating index for %s', msid.msid) indexes = Ska.Numpy.interpolate(np.arange(len(msid.times)), msid.times, obj.times, method='nearest', sorted=True) logger.info('Slicing on indexes') for colname in msid.colnames: colvals = getattr(msid, colname) if colvals is not None: setattr(msid, colname, colvals[indexes]) # Make a new attribute times0 that stores the nearest neighbor # interpolated times. Then set the MSID times to be the common # interpolation times. msid.times0 = msid.times msid.times = obj.times if bad_union: common_bads = np.zeros(len(obj.times), dtype=bool) for msid in msids: if msid.stat is None and msid.bads is None: warnings.warn('WARNING: {!r} MSID has bad values already filtered.\n' 'This prevents `filter_bad_union` from working as expected.\n' 'Use MSIDset (not Msidset) with filter_bad=False.\n') if msid.bads is not None: # 5min and daily stats have no bad values common_bads |= msid.bads # Apply the common bads array and optional filter out these bad values for msid in msids: msid.bads = common_bads if filter_bad: msid.filter_bad() # Filter MSIDset-level times attr to match invididual MSIDs if filter_bad is True if filter_bad: obj.times = obj.times[~common_bads] if copy: return obj
[docs] def write_zip(self, filename): """Write MSIDset to a zip file named ``filename`` Within the zip archive the data for each MSID in the set will be stored in csv format with the name <msid_name>.csv. :param filename: output zipfile name """ append = False for msid in self.values(): msid.write_zip(filename, append=append) append = True
[docs]class Msid(MSID): """ Fetch data from the engineering telemetry archive into an MSID object. Same as MSID class but with filter_bad=True by default. :param msid: name of MSID (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :param unit_system: Unit system (cxc|eng|sci, default=current units) :returns: MSID instance """ units = UNITS def __init__(self, msid, start=LAUNCH_DATE, stop=None, filter_bad=True, stat=None): super(Msid, self).__init__(msid, start=start, stop=stop, filter_bad=filter_bad, stat=stat)
[docs]class Msidset(MSIDset): """Fetch a set of MSIDs from the engineering telemetry archive. Same as MSIDset class but with filter_bad=True by default. :param msids: list of MSID names (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :param unit_system: Unit system (cxc|eng|sci, default=current units) :returns: Dict-like object containing MSID instances keyed by MSID name """ MSID = MSID def __init__(self, msids, start=LAUNCH_DATE, stop=None, filter_bad=True, stat=None): super(Msidset, self).__init__(msids, start=start, stop=stop, filter_bad=filter_bad, stat=stat)
class HrcSsMsid(Msid): """ Fetch data from the engineering telemetry archive into an MSID object. Same as MSID class but with filter_bad=True by default. :param msid: name of MSID (case-insensitive) :param start: start date of telemetry (Chandra.Time compatible) :param stop: stop date of telemetry (current time if not supplied) :param filter_bad: automatically filter out bad values :param stat: return 5-minute or daily statistics ('5min' or 'daily') :param unit_system: Unit system (cxc|eng|sci, default=current units) :returns: MSID instance """ units = UNITS def __new__(self, msid, start=LAUNCH_DATE, stop=None, stat=None): ss_msids = '2TLEV1RT 2VLEV1RT 2SHEV1RT 2TLEV2RT 2VLEV2RT 2SHEV2RT' if msid.upper() not in ss_msids.split(): raise ValueError('MSID {} is not in HRC secondary science ({})' .format(msid, ss_msids)) # If this is not full-resolution then add boolean bads mask to individual MSIDs msids = [msid, 'HRC_SS_HK_BAD'] out = MSIDset(msids, start=start, stop=stop, stat=stat) if stat is not None: for m in msids: out[m].bads = np.zeros(len(out[m].vals), dtype=np.bool) # Set bad mask i_bads = np.flatnonzero(out['HRC_SS_HK_BAD'].vals > 0) out['HRC_SS_HK_BAD'].bads[i_bads] = True # For full-resolution smear the bad mask out by +/- 5 samples if stat is None: for i_bad in i_bads: i0 = max(0, i_bad - 5) i1 = i_bad + 5 out['HRC_SS_HK_BAD'].bads[i0:i1] = True # Finally interpolate and filter out bad values out.interpolate(times=out[msid].times, bad_union=True, filter_bad=True) return out[msid] class memoized(object): """Decorator that caches a function's return value each time it is called. If called later with the same arguments, the cached value is returned, and not re-evaluated. """ def __init__(self, func): self.func = func self.cache = {} def __call__(self, *args): try: return self.cache[args] except KeyError: self.cache[args] = value = self.func(*args) return value except TypeError: # uncachable -- for instance, passing a list as an argument. # Better to not cache than to blow up entirely. return self.func(*args) def __repr__(self): """Return the function's docstring.""" return self.func.__doc__
[docs]def get_time_range(msid, format=None): """ Get the time range for the given ``msid``. :param msid: MSID name :param format: Output format (DateTime format, e.g. 'secs', 'date', 'greta') :returns: (tstart, tstop) in CXC seconds """ MSID = msid.upper() with _cache_ft(): ft['content'] = content[MSID] ft['msid'] = 'time' filename = msid_files['msid'].abs logger.info('Reading %s', filename) @local_or_remote_function("Getting time range from Ska eng archive server...") def get_time_data_from_server(filename): import tables h5 = tables.openFile(os.path.join(*filename)) tstart = h5.root.data[0] tstop = h5.root.data[-1] h5.close() return tstart, tstop if filename in CONTENT_TIME_RANGES: tstart, tstop = CONTENT_TIME_RANGES[filename] else: tstart, tstop = get_time_data_from_server(_split_path(filename)) CONTENT_TIME_RANGES[filename] = (tstart, tstop) if format is not None: tstart = getattr(DateTime(tstart), format) tstop = getattr(DateTime(tstop), format) return tstart, tstop
[docs]def get_telem(msids, start=None, stop=None, sampling='full', unit_system='eng', interpolate_dt=None, remove_events=None, select_events=None, time_format=None, outfile=None, quiet=False, max_fetch_Mb=1000, max_output_Mb=100): """ High-level routine to get telemetry for one or more MSIDs and perform common processing functions: - Fetch a set of MSIDs over a time range, specifying the sampling as either full-resolution, 5-minute, or daily data. - Filter out bad or missing data. - Interpolate (resample) all MSID values to a common uniformly-spaced time sequence. - Remove or select time intervals corresponding to specified Kadi event types. - Change the time format from CXC seconds (seconds since 1998.0) to something more convenient like GRETA time. - Write the MSID telemetry data to a zipfile. :param msids: MSID(s) to fetch (string or list of strings)') :param start: Start time for data fetch (default=<stop> - 30 days) :param stop: Stop time for data fetch (default=NOW) :param sampling: Data sampling (full | 5min | daily) (default=full) :param unit_system: Unit system for data (eng | sci | cxc) (default=eng) :param interpolate_dt: Interpolate to uniform time steps (secs, default=None) :param remove_events: Remove kadi events expression (default=None) :param select_events: Select kadi events expression (default=None) :param time_format: Output time format (secs|date|greta|jd|..., default=secs) :param outfile: Output file name (default=None) :param quiet: Suppress run-time logging output (default=False) :param max_fetch_Mb: Max allowed memory (Mb) for fetching (default=1000) :param max_output_Mb: Max allowed memory (Mb) for file output (default=100) :returns: MSIDset object """ from .get_telem import get_telem return get_telem(msids, start, stop, sampling, unit_system, interpolate_dt, remove_events, select_events, time_format, outfile, quiet, max_fetch_Mb, max_output_Mb)
@memoized def get_interval(content, tstart, tstop): """ Get the approximate row intervals that enclose the specified ``tstart`` and ``tstop`` times for the ``content`` type. :param content: content type (e.g. 'pcad3eng', 'thm1eng') :param tstart: start time (CXC seconds) :param tstop: stop time (CXC seconds) :returns: rowslice """ ft['content'] = content @local_or_remote_function("Getting interval data from " + "DB on Ska eng archive server...") def get_interval_from_db(tstart, tstop, server): import Ska.DBI db = Ska.DBI.DBI(dbi='sqlite', server=os.path.join(*server)) query_row = db.fetchone('SELECT tstart, rowstart FROM archfiles ' 'WHERE filetime < ? order by filetime desc', (tstart,)) if not query_row: query_row = db.fetchone('SELECT tstart, rowstart FROM archfiles ' 'order by filetime asc') rowstart = query_row['rowstart'] query_row = db.fetchone('SELECT tstop, rowstop FROM archfiles ' 'WHERE filetime > ? order by filetime asc', (tstop,)) if not query_row: query_row = db.fetchone('SELECT tstop, rowstop FROM archfiles ' 'order by filetime desc') rowstop = query_row['rowstop'] return slice(rowstart, rowstop) return get_interval_from_db(tstart, tstop, _split_path(msid_files['archfiles'].abs)) @contextlib.contextmanager def _cache_ft(): """ Cache the global filetype ``ft`` context variable so that fetch operations do not corrupt user values of ``ft``. """ ft_cache_pickle = pickle.dumps(ft) try: yield finally: ft_cache = pickle.loads(ft_cache_pickle) ft.update(ft_cache) delkeys = [x for x in ft if x not in ft_cache] for key in delkeys: del ft[key] @contextlib.contextmanager def _set_msid_files_basedir(datestart): """ If datestart is before 2000:001:00:00:00 then use the 1999 archive files. """ try: cache_basedir = msid_files.basedir if datestart < DATE2000_LO: # Note: don't use os.path.join because ENG_ARCHIVE and basedir must # use linux '/' convention but this might be running on Windows. msid_files.basedir = msid_files.basedir + '/1999' yield finally: msid_files.basedir = cache_basedir def add_logging_handler(level=logging.INFO, formatter=None, handler=None): """Configure logging for fetch module. :param level: logging level (logging.DEBUG, logging.INFO, etc) :param formatter: logging.Formatter (default: Formatter('%(funcName)s: %(message)s')) :param handler: logging.Handler (default: StreamHandler()) """ if formatter is None: formatter = logging.Formatter('%(funcName)s: %(message)s') if handler is None: handler = logging.StreamHandler() handler.setFormatter(formatter) logger.setLevel(level) logger.addHandler(handler) def _plural(x): """Return English plural of ``x``. Super-simple and only valid for the known small set of cases within fetch where it will get applied. """ return x + 'es' if (x.endswith('x') or x.endswith('s')) else x + 's'