Source code for pyfelyx.mdb

"""
.. module::

Generic access class to a felyx MDB

:copyright: Copyright 2017 Ifremer / Cersat.
:license: Released under GPL v3 license, see :ref:`license`.

.. sectionauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
.. codeauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
"""
from __future__ import print_function

import os
import ast
import re
import ConfigParser
import logging
import datetime
import glob
from collections import OrderedDict

import numpy
import netCDF4 as netcdf
from __builtin__ import classmethod


[docs]class MDB(object): """ An abstraction class to read the MDB records. It can be initialized in three different exclusive ways. A MDB configuration must define at least the following keys: * ``mdb_output_root``: the path to the root directory where the match-up files are stored (with YYYY/DDD subfolders) * ``full_identifier``: dictionary of the datasets provided in the match-ups the key is a free field (used in the other configuration parameters) and the value the corresponding prefix used in the match-up files (= full dataset identifier as configured in felyx) Args: identifier (str, optional): configuration identifier. If provided, a configuration file named <identifier>.cfg must exist in your $HOME/.s3analysis/mdb directory. cfgfile (str, optional): explicit path to a configuration file config (dict, optional): a configuration dictionary to decode the MDB """
[docs] def __init__(self, identifier=None, cfgfile=None, config=None): if ((identifier is not None) * (cfgfile is not None) * (config is not None)) != 0: raise ValueError("identifier and cfgfile arguments are exclusive") # defaults self.datasets = {} self.prefixes = None self.default_max_distance = 1000 self.default_full_box_extraction = False self.default_dataset_fields = None self.default_felyx_fields = None self.reference = None if identifier is not None: configfile = os.path.join( self.get_config_root(), '%s.cfg' % identifier ) self.load_config(configfile) elif cfgfile is not None: self.load_config(cfgfile) elif config is not None: if not 'mdb_output_root' in config: raise ValueError('missing mdb_output_root in config') self.root = config['mdb_output_root'] if not 'validity' in config: self.default_filters = {} else: self.default_felyx_fields if 'reference' in config: self.reference = config['reference']
[docs] def __str__(self, *args, **kwargs): return "\n".join([ "root: {}".format(self.root) ])
[docs] @classmethod def get_config_root(cls): return os.path.join( os.environ['HOME'], '.s3analysis', 'mdb' )
[docs] def get_filters(self, config): self.default_filters = {} if config.has_section('validity'): for dataset, v in config.items('validity'): field, vrange = re.sub(r'\s', '', v).split('=') if dataset not in self.default_filters: self.default_filters[dataset] = {} minv, maxv = vrange.split(',') if minv == '': minv = None else: minv = ast.literal_eval(minv) if maxv == '': maxv = None else: maxv = ast.literal_eval(maxv) self.default_filters[dataset][field] = [minv, maxv]
[docs] def load_config(self, configfile): logging.info('Loading configuration: {}'.format(configfile)) if not os.path.exists(configfile): raise IOError( "MDB configuration file not found: {}".format(configfile)) # load config config = ConfigParser.RawConfigParser() config.optionxform = str config.read(configfile) # mdb name and identifiers self.root = config.get("general", "mdb_output_root") # reference dataset for this MDB self.reference = config.get("general", "reference") # list of datasets provided in the match-ups, with their corresponding # prefixes in match-up files (= full identifier as configured in felyx) self.datasets = {} self.prefixes = {} for dataset, prefix in config.items("full_identifier"): self.prefixes[dataset] = prefix self.datasets[prefix] = dataset # default data extraction arguments try: self.default_felyx_fields = config.get('defaults', 'felyx_fields').split(',') except: self.default_felyx_fields = None try: self.default_dataset_fields = { k: v.split(',') for k, v in config.items('default_dataset_fields') } except: self.default_dataset_fields = None try: self.default_max_distance = config.getint('defaults', 'max_distance') except: self.default_max_distance = 1000 try: self.default_full_box_extraction = config.getbool('defaults', 'full_box_extraction') except: self.default_full_box_extraction = False # validity filters self.get_filters(config) return config
[docs] def get_matchup_files(self, dates, source): """Return the MDB files for a source and list of dates. Assumes the MDB files are stored locally, following a year/day in year folder organization. Args: dates(list of datetime): list of dates for which mdb files will be collected. source(str): the source of dynamic sites (usually a type of in situ data) for which to collect the MDB files. Returns: list: list of full path to collected mdb files """ mdbfiles = [] for date in dates: # file pattern for the current date # root / year / day in year / <source>_<start time>_<end time>.nc mdbpattern = os.path.join( self.root, date.strftime("%Y/%j"), "*{}_{}*".format(source, date.strftime("%Y%m%d")) ) # collect corresponding files files_for_day = glob.glob(mdbpattern) if len(files_for_day) == 0: msg = ("No MDB files found for {} with pattern {}" .format(source, mdbpattern)) logging.warning(msg) else: mdbfiles.extend(files_for_day) return mdbfiles
[docs] def get_closest_pixel_indices(self, sat_lat, sat_lon, ins_lat, ins_lon): """Return the indices and distances of closest pixel in each satellite boxes to in situ data. The match-ups are provided as boxes of neighbour pixels centered on the dynamic site location (usually a in situ measurement). If the pixel latitudes and longitudes are masked, this function will return the location of the closest pixel with valid lat/lon for each match-up. Args: sat_lat (numpy.ma.array): array of latitudes of all match-up box pixels sat_lon (numpy.ma.array): array of longitudes of all match-up box pixels ins_lat (float): latitude of the dynamic site location ins_lon (float): longitude of the dynamic site location Return: tuple: tuple of the indices of the closest pixel in each match-up and its respective distance. The indices is a tuple itself, with row and cell indices as numpy arrays. """ # broadcast lat/lon to same shape _, x, y = sat_lon.shape distance = ((sat_lat.flatten() - ins_lat.repeat(x * y))**2 + ((sat_lon.flatten() - ins_lon.repeat(x * y))**2)) distance = numpy.ma.sqrt(distance).reshape((_, x * y)) indices = numpy.ma.argmin(distance, axis=1) row, cell = numpy.unravel_index(indices, (x, y)) #print "DIST ", indices[0] #, distance[0,:]* 100000 # print sat_lat[0,:] #print row[0], cell[0] #print distance[0,...] * 100000, distance.shape # mask where no valid values at all in match-up box mask = numpy.ma.all(distance.mask, axis=1) row = numpy.ma.masked_where(mask, row) cell = numpy.ma.masked_where(mask, cell) return (row, cell), distance[numpy.arange(len(indices)), indices] * 100000
[docs] def get_closest_valid_pixel_indices( self, mdbf, dataset, lat=None, lon=None, filters=None, prefixes=None): """Return the indice of the closest valid pixel to the in situ measurement location, in each match-up satellite box. The pixel validity is defined by the minimum expected quality level. Args: dataset (str): the dataset used as reference for the lat/lon filters (dict): the list of fields and threshold range used to define the validity, per dataset (ex: {'WST': {'quality_level': [2,5]}}) """ # read in situ measurement locations for each match-ups if lat is None and lon is None: lat = numpy.ma.fix_invalid(mdbf.variables['lat'][:]) lon = numpy.ma.fix_invalid(mdbf.variables['lon'][:]) # case where dataset is not in the match-up files (no match-up at all for # this dataset) if prefixes is None: prefix = self.prefixes[dataset] else: prefix = prefixes[dataset] latvarname = '{}__lat'.format(prefix) lonvarname = '{}__lon'.format(prefix) if latvarname not in mdbf.variables.keys(): row = numpy.ma.masked_all((len(lat)), dtype='int32') cell = numpy.ma.masked_all((len(lat)), dtype='int32') dist = numpy.ma.masked_all((len(lat)), dtype='float64') return (row, cell), dist sat_lat = numpy.ma.fix_invalid(mdbf.variables[latvarname][:, :, :]) sat_lon = numpy.ma.fix_invalid(mdbf.variables[lonvarname][:, :, :]) # mask invalid pixels if filters is not None: for dataset in filters: if prefixes is None: prefix = self.prefixes[dataset] else: prefix = prefixes[dataset] for field, valid_range in filters[dataset].items(): fieldname = '{}__{}'.format(prefix, field) if fieldname in mdbf.variables: values = numpy.ma.fix_invalid(mdbf.variables[fieldname][:, :, :]) #print values[0,...] if valid_range[0] is None: mask = (values > valid_range[1]) elif valid_range[1] is None: mask = (values < valid_range[0]) else: mask = (values < valid_range[0]) | (values > valid_range[1]) else: mask = numpy.ma.ones(sat_lon.shape) sat_lat = numpy.ma.masked_where(mask, sat_lat, copy=False) sat_lon = numpy.ma.masked_where(mask, sat_lon, copy=False) return self.get_closest_pixel_indices(sat_lat, sat_lon, lat, lon)
[docs] def extract_value(self, fieldname, data, row, cell): """extract the pixel pointed to by row and cell indices. Override here for specific cases """ n = len(row) return data[numpy.arange(n), row, cell, ...]
[docs] def read_satellite_data( self, source, start, end=None, felyx_fields=None, dataset_fields=None, full_box=False, filters=None, max_distance=None, subbox=None, reference=None ): """ Args: source (str): the site collections (cmems_drifter, cmems_argo, etc...) start (datetime): the date (or start date if end_date is also set) for which to retrieve the matchup in situ values end (datetime): the end date of the request match-up series. If None (default), only the start day is returned felyx_fields (list): a list of felyx internal fields dataset_fields (dict): a list of fields to read per dataset full_box (boolean): if False (default) return only the closest valid value to in situ measurement instead of the full box of neighbours. This closest valid value is evaluated wrt to the min_quality level requested and max_distance. Pixels not meeting these criteria are discarded. filters (dict): validity range for a selection of fields, per dataset. This defines which pixels can be considered as valid when searching for the closest pixel to the in situ measurement in a match-up. Only applicable if box is set to False. max_distance: distance beyond which match-ups are not selected. Only applicable if ``full_box`` is set to False subbox (int): size of the box to extract (when ``box`` is set). This box size must be lower than the match-up box size. The box centered on the in situ pixel is extracted. reference (str): the reference product in the match-ups. This is product which lat/lon are used for calculation of the pixel distances to in situ/ Returns: a dict where keys are field names, and values the retrieved data """ # set default arguments if felyx_fields is None: if self.default_felyx_fields is not None: felyx_fields = self.default_felyx_fields else: felyx_fields = [] if dataset_fields is None: if self.default_dataset_fields is not None: dataset_fields = self.default_dataset_fields else: dataset_fields = {} if len(felyx_fields) + len(dataset_fields.keys()) == 0: raise ValueError( "'felyx_fields' or 'dataset_fields' must have at least one" "element" ) if reference is None: if self.reference is None: raise ValueError("'reference' must be provided") reference = self.reference if filters is None: filters = self.default_filters if max_distance is None: max_distance = self.default_max_distance if full_box is None: full_box = self.default_full_box # remove duplicates for dataset in dataset_fields: dataset_fields[dataset] = list(set(dataset_fields[dataset])) felyx_fields = list(set(felyx_fields)) # use keys from felyx_fields as netcdf variable's prefixes if no # full_identifier dataset was provided if self.prefixes is None: prefixes = {_: _ for _ in dataset_fields.keys()} else: prefixes = self.prefixes # get list of days in interval dates = [start] if end is not None: date = start + datetime.timedelta(days=1) while date <= end: dates.append(date) date += datetime.timedelta(days=1) # get matchup files in the interval mdbfiles = self.get_matchup_files(dates, source) result = OrderedDict([(_, OrderedDict([])) for _ in dataset_fields]) # if no files are found, return empty arrays for the interval if len(mdbfiles) == 0: result = OrderedDict([ (_, OrderedDict([(field, numpy.ma.array(())) for field in dataset_fields[_] ])) for _ in dataset_fields ]) return result # concatenate all files for the same source and dataset for kf, mdbfname in enumerate(mdbfiles): msg = "File: {}".format(mdbfname) logging.info(msg) print("Reading file {} over {}".format(kf + 1, len(mdbfiles)), end='\r') mdbf = netcdf.Dataset(mdbfname) # read the center pixel coordinates lat = numpy.ma.fix_invalid(mdbf.variables['lat'][:]) lon = numpy.ma.fix_invalid(mdbf.variables['lon'][:]) if not full_box or subbox: # get indice of closest pixel to in situ, for each dataset indices, distances = self.get_closest_valid_pixel_indices( mdbf, reference, lat=lat, lon=lon, filters=filters, prefixes=prefixes ) i, j = indices #print "INDICES ", i.min(), i.max() # extract subbox if required if subbox: # shift indices on boxe edges halfbox = subbox / 2 + 1 imin = 0 > (i - halfbox) if imin.any(): i[imin] += halfbox imax = len(mdbf.dimensions['row']) <= (i + halfbox) if imax.any(): i[imax] -= halfbox jmin = 0 > (j - halfbox) if jmin.any(): j[jmin] += halfbox jmax = len(mdbf.dimensions['cell']) <= (i + halfbox) if jmax.any(): j[jmax] -= halfbox # create indices for box extractions boxcenters = (numpy.arange(len(lat)), i, j) subboxes = ( boxcenters[0].reshape(-1, 1, 1), boxcenters[1][:, numpy.newaxis, numpy.newaxis] + \ numpy.arange(-1,2).reshape((1,subbox,1)), boxcenters[2][:, numpy.newaxis, numpy.newaxis] + \ numpy.arange(-1,2).reshape((1,1,subbox)) ) # extract requested fields for each dataset contained in the MDB set for dataset in dataset_fields.keys(): # loop on variables for this dataset for varname in dataset_fields[dataset]: # add prefix var = "%s__%s" % (prefixes[dataset], varname) if var not in mdbf.variables: if not full_box: # no matchup for this dataset in this file data = numpy.ma.masked_all((len(i))) else: # need to noodle with dimensions I don't know yet... logging.error("Unknown variable %s in %s" % (var, mdbfname)) raise NotImplementedError else: # condition for searching a value in box # must be requested and field must be a box field condition = ( not full_box and any(['row' in _ for _ in mdbf.variables[var].dimensions]) and any([ 'cell' in _ for _ in mdbf.variables[var].dimensions]) ) if mdbf.variables[var].dtype is str: data = mdbf.variables[var][...] else: data = numpy.ma.fix_invalid(mdbf.variables[var][...]) if condition: # select closest value to in situ only data = self.extract_value(varname, data, i, j) # apply distance filter if (max_distance is not None and data.shape == distances.shape): data = numpy.ma.masked_where(distances > max_distance, data, copy=False) else: logging.warning( "distance masking skipped for {}" .format(varname) ) if full_box and subbox: data = data[subboxes] if varname in result[dataset]: result[dataset][varname] = numpy.ma.concatenate((result[dataset][varname], data)) else: result[dataset][varname] = numpy.ma.array(data) # loop on non dataset specific variables for var in felyx_fields: data = numpy.ma.fix_invalid(mdbf.variables[var][...]) if var in result[dataset]: result[dataset][var] = numpy.ma.concatenate((result[dataset][var], data)) else: result[dataset][var] = numpy.ma.array(data) mdbf.close() print() return result
[docs] def read_insitu_field(self, matchup_file, source, field, min_quality=None, closest_to_surface=True): """read the colocated in situ data in match-up files. Select only the closest measurement in time to satellite pixel. The rest of the buoy or platform history is ignored. Args: matchup_file (Dataset): a handler on a NetCDF match-up file source (str): the site collection (cmems_drifter, cmems_argo, etc...) field (str): the name of the field to be read (remove the level suffix: for instance, use cmems_water_temperature instead of cmems_water_temperature_0 and the function will detect and reconstruct the two dimensional fields for you). min_quality (int): minimum quality level for a measurement to be considered as valid (and returned in the result) closest_to_surface (boolean): takes only the closest measurement to surface. """ # get the center in situ observation index in each in situ data # history (the observation closest in time to satellite pixel) vinsitu_center = '{}__dynamic_target_center_index'.format(source) center_indices = matchup_file.variables[vinsitu_center][:] # fix negative values with missing fill_value in this netcdf variable (bug) center_indices = numpy.ma.masked_where(center_indices < 0, center_indices) # mask for valid measurements only valid = numpy.ma.array( numpy.arange(len(center_indices)), mask=center_indices.mask ) nbmatchups = len(valid) valid = valid.compressed() if '{}_0'.format(field) in matchup_file.variables: # reconstruct a 2D field from flat variables splitted by level # (ex: water_temperature_0, water_temperature_1, etc...) values = numpy.ma.empty((nbmatchups, 5)) for level in xrange(5): # select only the closest in situ data to satellite time varname = "{}_{}".format(field, level) tmp = matchup_file.variables[varname][:] values[valid, level] = tmp[ valid, numpy.array(center_indices[valid]) ] # use quality level to invalidate bad data if min_quality is not None: varname = "{}__{}_{}".format(source, 'quality_level', level) tmp = matchup_file.variables[varname][:] qual = numpy.ma.masked_all((nbmatchups,), tmp.dtype) qual[valid] = tmp[valid, numpy.array(center_indices[valid])] values[:, level] = numpy.ma.masked_where( qual < min_quality, values[:, level] ) if closest_to_surface: # flatten this 2D field by taken the closest valid measurement to # surface indices = numpy.ma.notmasked_edges(values, axis=1)[0] tmp = numpy.ma.masked_all((nbmatchups,), values.dtype) tmp[indices[0]] = values[numpy.ma.notmasked_edges(values, axis=1)[0]] values = tmp # @TODO add depth test else: # single dimension field if len(matchup_file.variables[field].dimensions) == 1: values = matchup_file.variables[field][:] else: # no depth dimension but history dimension is there : select # central pixel tmp = matchup_file.variables[field][:] values = numpy.ma.masked_all((nbmatchups,), tmp.dtype) try: values[valid] = tmp[ valid, center_indices[valid] ] except: print(valid) print(center_indices) raise return numpy.ma.fix_invalid(values)
[docs] def read_insitu_data(self, source, day, fields, min_quality=None, closest_to_surface=True, end=None): """ Args: source (str): the site collections (cmems_drifter, cmems_argo, etc...) day (datetime): the date (or start date if end_date is also set) for which to retrieve the matchup in situ values matchup_root (str): the path to the root repository where the matchup files are stored. fields (list): a list of fields to read (remove the level suffix: for instance, use cmems_water_temperature instead of cmems_water_temperature_0 and the function will detect and reconstruct the two dimensional fields for you). Returns: a dict where keys are field names, and values the retrieved data """ if len(fields) == 0: return OrderedDict([]) dates = [day] if end is not None: date = day + datetime.timedelta(days=1) while date <= end: dates.append(date) date += datetime.timedelta(days=1) # if not closest_to_surface: # raise NotImplementedError result = OrderedDict([]) # get matchup files mdbfiles = self.get_matchup_files(dates, source) # if no files found, return empty arrays if len(mdbfiles) == 0: for field in fields: result[field] = numpy.ma.array(()) return result # concatenate all files for the same source and dataset for mdbfname in mdbfiles: msg = "File: {}".format(mdbfname) logging.info(msg) print(msg) mdbf = netcdf.Dataset(mdbfname) # read in situ data for var in fields: varname = "{}__{}".format(source, var) data = self.read_insitu_field( mdbf, source, varname, min_quality=min_quality, closest_to_surface=closest_to_surface ) if var in result: result[var] = numpy.ma.concatenate((result[var], data)) else: result[var] = numpy.ma.array(data) mdbf.close() return result
[docs] @classmethod def reduce(cls, dynamic_data, matched_data, keep): """ Reduce the data arrays with respect to a selection filter Args: dynamic_data(dict): the dictionary of data fields from the dynamic site source (in situ data) matched_data (dict):the dictionnary of the fields of the matched datasets keep (arr): the selection of match-up to keep. One dimensional array where True indicates the match-ups to keep, False the match-ups to remove. """ if dynamic_data == {} or dynamic_data[dynamic_data.keys()[0]].size == keep.size: logging.info( "Reducing the data fields. Removing masked match-ups." ) for field in dynamic_data: dynamic_data[field] = dynamic_data[field][keep] for dataset in matched_data: for field in matched_data[dataset]: matched_data[dataset][field] = \ matched_data[dataset][field][keep,...] else: raise ValueError( "The selection filter has a wrong number of elements" )