"""
.. module::
Generic access class to a felyx MDB
:copyright: Copyright 2017 Ifremer / Cersat.
:license: Released under GPL v3 license, see :ref:`license`.
.. sectionauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
.. codeauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
"""
from __future__ import print_function
import os
import ast
import re
import ConfigParser
import logging
import datetime
import glob
from collections import OrderedDict
import numpy
import netCDF4 as netcdf
from __builtin__ import classmethod
[docs]class MDB(object):
"""
An abstraction class to read the MDB records. It can be initialized in
three different exclusive ways.
A MDB configuration must define at least the following keys:
* ``mdb_output_root``: the path to the root directory where the match-up files
are stored (with YYYY/DDD subfolders)
* ``full_identifier``: dictionary of the datasets provided in the match-ups
the key is a free field (used in the other configuration parameters)
and the value the corresponding prefix used in the match-up files
(= full dataset identifier as configured in felyx)
Args:
identifier (str, optional): configuration identifier. If provided, a
configuration file named <identifier>.cfg must exist in your
$HOME/.s3analysis/mdb directory.
cfgfile (str, optional): explicit path to a configuration file
config (dict, optional): a configuration dictionary to decode the MDB
"""
[docs] def __init__(self, identifier=None, cfgfile=None, config=None):
if ((identifier is not None) *
(cfgfile is not None) * (config is not None)) != 0:
raise ValueError("identifier and cfgfile arguments are exclusive")
# defaults
self.datasets = {}
self.prefixes = None
self.default_max_distance = 1000
self.default_full_box_extraction = False
self.default_dataset_fields = None
self.default_felyx_fields = None
self.reference = None
if identifier is not None:
configfile = os.path.join(
self.get_config_root(),
'%s.cfg' % identifier
)
self.load_config(configfile)
elif cfgfile is not None:
self.load_config(cfgfile)
elif config is not None:
if not 'mdb_output_root' in config:
raise ValueError('missing mdb_output_root in config')
self.root = config['mdb_output_root']
if not 'validity' in config:
self.default_filters = {}
else:
self.default_felyx_fields
if 'reference' in config:
self.reference = config['reference']
[docs] def __str__(self, *args, **kwargs):
return "\n".join([
"root: {}".format(self.root)
])
[docs] @classmethod
def get_config_root(cls):
return os.path.join(
os.environ['HOME'],
'.s3analysis',
'mdb'
)
[docs] def get_filters(self, config):
self.default_filters = {}
if config.has_section('validity'):
for dataset, v in config.items('validity'):
field, vrange = re.sub(r'\s', '', v).split('=')
if dataset not in self.default_filters:
self.default_filters[dataset] = {}
minv, maxv = vrange.split(',')
if minv == '':
minv = None
else:
minv = ast.literal_eval(minv)
if maxv == '':
maxv = None
else:
maxv = ast.literal_eval(maxv)
self.default_filters[dataset][field] = [minv, maxv]
[docs] def load_config(self, configfile):
logging.info('Loading configuration: {}'.format(configfile))
if not os.path.exists(configfile):
raise IOError(
"MDB configuration file not found: {}".format(configfile))
# load config
config = ConfigParser.RawConfigParser()
config.optionxform = str
config.read(configfile)
# mdb name and identifiers
self.root = config.get("general", "mdb_output_root")
# reference dataset for this MDB
self.reference = config.get("general", "reference")
# list of datasets provided in the match-ups, with their corresponding
# prefixes in match-up files (= full identifier as configured in felyx)
self.datasets = {}
self.prefixes = {}
for dataset, prefix in config.items("full_identifier"):
self.prefixes[dataset] = prefix
self.datasets[prefix] = dataset
# default data extraction arguments
try:
self.default_felyx_fields = config.get('defaults', 'felyx_fields').split(',')
except:
self.default_felyx_fields = None
try:
self.default_dataset_fields = {
k: v.split(',')
for k, v in config.items('default_dataset_fields')
}
except:
self.default_dataset_fields = None
try:
self.default_max_distance = config.getint('defaults', 'max_distance')
except:
self.default_max_distance = 1000
try:
self.default_full_box_extraction = config.getbool('defaults', 'full_box_extraction')
except:
self.default_full_box_extraction = False
# validity filters
self.get_filters(config)
return config
[docs] def get_matchup_files(self, dates, source):
"""Return the MDB files for a source and list of dates.
Assumes the MDB files are stored locally, following a year/day in year
folder organization.
Args:
dates(list of datetime): list of dates for which mdb files will be
collected.
source(str): the source of dynamic sites (usually a type of in situ
data) for which to collect the MDB files.
Returns:
list: list of full path to collected mdb files
"""
mdbfiles = []
for date in dates:
# file pattern for the current date
# root / year / day in year / <source>_<start time>_<end time>.nc
mdbpattern = os.path.join(
self.root,
date.strftime("%Y/%j"),
"*{}_{}*".format(source, date.strftime("%Y%m%d"))
)
# collect corresponding files
files_for_day = glob.glob(mdbpattern)
if len(files_for_day) == 0:
msg = ("No MDB files found for {} with pattern {}"
.format(source, mdbpattern))
logging.warning(msg)
else:
mdbfiles.extend(files_for_day)
return mdbfiles
[docs] def get_closest_pixel_indices(self, sat_lat, sat_lon, ins_lat, ins_lon):
"""Return the indices and distances of closest pixel in each satellite
boxes to in situ data.
The match-ups are provided as boxes of neighbour pixels centered on the
dynamic site location (usually a in situ measurement). If the pixel
latitudes and longitudes are masked, this function will return the
location of the closest pixel with valid lat/lon for each match-up.
Args:
sat_lat (numpy.ma.array): array of latitudes of all match-up box pixels
sat_lon (numpy.ma.array): array of longitudes of all match-up box
pixels
ins_lat (float): latitude of the dynamic site location
ins_lon (float): longitude of the dynamic site location
Return:
tuple: tuple of the indices of the closest pixel in each match-up and
its respective distance. The indices is a tuple itself, with row
and cell indices as numpy arrays.
"""
# broadcast lat/lon to same shape
_, x, y = sat_lon.shape
distance = ((sat_lat.flatten() - ins_lat.repeat(x * y))**2 +
((sat_lon.flatten() - ins_lon.repeat(x * y))**2))
distance = numpy.ma.sqrt(distance).reshape((_, x * y))
indices = numpy.ma.argmin(distance, axis=1)
row, cell = numpy.unravel_index(indices, (x, y))
#print "DIST ", indices[0] #, distance[0,:]* 100000
# print sat_lat[0,:]
#print row[0], cell[0]
#print distance[0,...] * 100000, distance.shape
# mask where no valid values at all in match-up box
mask = numpy.ma.all(distance.mask, axis=1)
row = numpy.ma.masked_where(mask, row)
cell = numpy.ma.masked_where(mask, cell)
return (row, cell), distance[numpy.arange(len(indices)), indices] * 100000
[docs] def get_closest_valid_pixel_indices(
self,
mdbf,
dataset,
lat=None,
lon=None,
filters=None,
prefixes=None):
"""Return the indice of the closest valid pixel to the in situ
measurement location, in each match-up satellite box.
The pixel validity is defined by the minimum expected quality level.
Args:
dataset (str): the dataset used as reference for the lat/lon
filters (dict): the list of fields and threshold range used
to define the validity, per dataset
(ex: {'WST': {'quality_level': [2,5]}})
"""
# read in situ measurement locations for each match-ups
if lat is None and lon is None:
lat = numpy.ma.fix_invalid(mdbf.variables['lat'][:])
lon = numpy.ma.fix_invalid(mdbf.variables['lon'][:])
# case where dataset is not in the match-up files (no match-up at all for
# this dataset)
if prefixes is None:
prefix = self.prefixes[dataset]
else:
prefix = prefixes[dataset]
latvarname = '{}__lat'.format(prefix)
lonvarname = '{}__lon'.format(prefix)
if latvarname not in mdbf.variables.keys():
row = numpy.ma.masked_all((len(lat)), dtype='int32')
cell = numpy.ma.masked_all((len(lat)), dtype='int32')
dist = numpy.ma.masked_all((len(lat)), dtype='float64')
return (row, cell), dist
sat_lat = numpy.ma.fix_invalid(mdbf.variables[latvarname][:, :, :])
sat_lon = numpy.ma.fix_invalid(mdbf.variables[lonvarname][:, :, :])
# mask invalid pixels
if filters is not None:
for dataset in filters:
if prefixes is None:
prefix = self.prefixes[dataset]
else:
prefix = prefixes[dataset]
for field, valid_range in filters[dataset].items():
fieldname = '{}__{}'.format(prefix, field)
if fieldname in mdbf.variables:
values = numpy.ma.fix_invalid(mdbf.variables[fieldname][:, :, :])
#print values[0,...]
if valid_range[0] is None:
mask = (values > valid_range[1])
elif valid_range[1] is None:
mask = (values < valid_range[0])
else:
mask = (values < valid_range[0]) | (values > valid_range[1])
else:
mask = numpy.ma.ones(sat_lon.shape)
sat_lat = numpy.ma.masked_where(mask, sat_lat, copy=False)
sat_lon = numpy.ma.masked_where(mask, sat_lon, copy=False)
return self.get_closest_pixel_indices(sat_lat, sat_lon, lat, lon)
[docs] def read_satellite_data(
self,
source,
start,
end=None,
felyx_fields=None,
dataset_fields=None,
full_box=False,
filters=None,
max_distance=None,
subbox=None,
reference=None
):
"""
Args:
source (str): the site collections (cmems_drifter, cmems_argo, etc...)
start (datetime): the date (or start date if end_date is also set) for
which to retrieve the matchup in situ values
end (datetime): the end date of the request match-up series. If None
(default), only the start day is returned
felyx_fields (list): a list of felyx internal fields
dataset_fields (dict): a list of fields to read per dataset
full_box (boolean): if False (default) return only the closest valid
value to in situ measurement instead of the full box of
neighbours. This closest valid value is evaluated wrt to the
min_quality level requested and max_distance. Pixels not
meeting these criteria are discarded.
filters (dict): validity range for a selection of fields, per
dataset. This defines which pixels can be considered as valid
when searching for the closest pixel to the in situ measurement
in a match-up. Only applicable if box is set to False.
max_distance: distance beyond which match-ups are not selected. Only
applicable if ``full_box`` is set to False
subbox (int): size of the box to extract (when ``box`` is set). This
box size must be lower than the match-up box size. The box centered
on the in situ pixel is extracted.
reference (str): the reference product in the match-ups. This is
product which lat/lon are used for calculation of the pixel
distances to in situ/
Returns:
a dict where keys are field names, and values the retrieved data
"""
# set default arguments
if felyx_fields is None:
if self.default_felyx_fields is not None:
felyx_fields = self.default_felyx_fields
else:
felyx_fields = []
if dataset_fields is None:
if self.default_dataset_fields is not None:
dataset_fields = self.default_dataset_fields
else:
dataset_fields = {}
if len(felyx_fields) + len(dataset_fields.keys()) == 0:
raise ValueError(
"'felyx_fields' or 'dataset_fields' must have at least one"
"element"
)
if reference is None:
if self.reference is None:
raise ValueError("'reference' must be provided")
reference = self.reference
if filters is None:
filters = self.default_filters
if max_distance is None:
max_distance = self.default_max_distance
if full_box is None:
full_box = self.default_full_box
# remove duplicates
for dataset in dataset_fields:
dataset_fields[dataset] = list(set(dataset_fields[dataset]))
felyx_fields = list(set(felyx_fields))
# use keys from felyx_fields as netcdf variable's prefixes if no
# full_identifier dataset was provided
if self.prefixes is None:
prefixes = {_: _ for _ in dataset_fields.keys()}
else:
prefixes = self.prefixes
# get list of days in interval
dates = [start]
if end is not None:
date = start + datetime.timedelta(days=1)
while date <= end:
dates.append(date)
date += datetime.timedelta(days=1)
# get matchup files in the interval
mdbfiles = self.get_matchup_files(dates, source)
result = OrderedDict([(_, OrderedDict([])) for _ in dataset_fields])
# if no files are found, return empty arrays for the interval
if len(mdbfiles) == 0:
result = OrderedDict([
(_, OrderedDict([(field, numpy.ma.array(()))
for field in dataset_fields[_]
]))
for _ in dataset_fields
])
return result
# concatenate all files for the same source and dataset
for kf, mdbfname in enumerate(mdbfiles):
msg = "File: {}".format(mdbfname)
logging.info(msg)
print("Reading file {} over {}".format(kf + 1, len(mdbfiles)), end='\r')
mdbf = netcdf.Dataset(mdbfname)
# read the center pixel coordinates
lat = numpy.ma.fix_invalid(mdbf.variables['lat'][:])
lon = numpy.ma.fix_invalid(mdbf.variables['lon'][:])
if not full_box or subbox:
# get indice of closest pixel to in situ, for each dataset
indices, distances = self.get_closest_valid_pixel_indices(
mdbf,
reference,
lat=lat,
lon=lon,
filters=filters,
prefixes=prefixes
)
i, j = indices
#print "INDICES ", i.min(), i.max()
# extract subbox if required
if subbox:
# shift indices on boxe edges
halfbox = subbox / 2 + 1
imin = 0 > (i - halfbox)
if imin.any():
i[imin] += halfbox
imax = len(mdbf.dimensions['row']) <= (i + halfbox)
if imax.any():
i[imax] -= halfbox
jmin = 0 > (j - halfbox)
if jmin.any():
j[jmin] += halfbox
jmax = len(mdbf.dimensions['cell']) <= (i + halfbox)
if jmax.any():
j[jmax] -= halfbox
# create indices for box extractions
boxcenters = (numpy.arange(len(lat)), i, j)
subboxes = (
boxcenters[0].reshape(-1, 1, 1),
boxcenters[1][:, numpy.newaxis, numpy.newaxis] + \
numpy.arange(-1,2).reshape((1,subbox,1)),
boxcenters[2][:, numpy.newaxis, numpy.newaxis] + \
numpy.arange(-1,2).reshape((1,1,subbox))
)
# extract requested fields for each dataset contained in the MDB set
for dataset in dataset_fields.keys():
# loop on variables for this dataset
for varname in dataset_fields[dataset]:
# add prefix
var = "%s__%s" % (prefixes[dataset], varname)
if var not in mdbf.variables:
if not full_box:
# no matchup for this dataset in this file
data = numpy.ma.masked_all((len(i)))
else:
# need to noodle with dimensions I don't know yet...
logging.error("Unknown variable %s in %s" %
(var, mdbfname))
raise NotImplementedError
else:
# condition for searching a value in box
# must be requested and field must be a box field
condition = (
not full_box and
any(['row' in _ for _ in mdbf.variables[var].dimensions]) and
any([
'cell' in _ for _ in mdbf.variables[var].dimensions])
)
if mdbf.variables[var].dtype is str:
data = mdbf.variables[var][...]
else:
data = numpy.ma.fix_invalid(mdbf.variables[var][...])
if condition:
# select closest value to in situ only
data = self.extract_value(varname, data, i, j)
# apply distance filter
if (max_distance is not None
and data.shape == distances.shape):
data = numpy.ma.masked_where(distances > max_distance,
data,
copy=False)
else:
logging.warning(
"distance masking skipped for {}"
.format(varname)
)
if full_box and subbox:
data = data[subboxes]
if varname in result[dataset]:
result[dataset][varname] = numpy.ma.concatenate((result[dataset][varname], data))
else:
result[dataset][varname] = numpy.ma.array(data)
# loop on non dataset specific variables
for var in felyx_fields:
data = numpy.ma.fix_invalid(mdbf.variables[var][...])
if var in result[dataset]:
result[dataset][var] = numpy.ma.concatenate((result[dataset][var], data))
else:
result[dataset][var] = numpy.ma.array(data)
mdbf.close()
print()
return result
[docs] def read_insitu_field(self, matchup_file, source, field,
min_quality=None,
closest_to_surface=True):
"""read the colocated in situ data in match-up files. Select
only the closest measurement in time to satellite pixel. The
rest of the buoy or platform history is ignored.
Args:
matchup_file (Dataset): a handler on a NetCDF match-up file
source (str): the site collection (cmems_drifter, cmems_argo, etc...)
field (str): the name of the field to be read (remove the level suffix:
for instance, use cmems_water_temperature instead of
cmems_water_temperature_0 and the function will detect and
reconstruct the two dimensional fields for you).
min_quality (int): minimum quality level for a measurement to be
considered as valid (and returned in the result)
closest_to_surface (boolean): takes only the closest measurement to
surface.
"""
# get the center in situ observation index in each in situ data
# history (the observation closest in time to satellite pixel)
vinsitu_center = '{}__dynamic_target_center_index'.format(source)
center_indices = matchup_file.variables[vinsitu_center][:]
# fix negative values with missing fill_value in this netcdf variable (bug)
center_indices = numpy.ma.masked_where(center_indices < 0, center_indices)
# mask for valid measurements only
valid = numpy.ma.array(
numpy.arange(len(center_indices)),
mask=center_indices.mask
)
nbmatchups = len(valid)
valid = valid.compressed()
if '{}_0'.format(field) in matchup_file.variables:
# reconstruct a 2D field from flat variables splitted by level
# (ex: water_temperature_0, water_temperature_1, etc...)
values = numpy.ma.empty((nbmatchups, 5))
for level in xrange(5):
# select only the closest in situ data to satellite time
varname = "{}_{}".format(field, level)
tmp = matchup_file.variables[varname][:]
values[valid, level] = tmp[
valid,
numpy.array(center_indices[valid])
]
# use quality level to invalidate bad data
if min_quality is not None:
varname = "{}__{}_{}".format(source, 'quality_level', level)
tmp = matchup_file.variables[varname][:]
qual = numpy.ma.masked_all((nbmatchups,), tmp.dtype)
qual[valid] = tmp[valid,
numpy.array(center_indices[valid])]
values[:, level] = numpy.ma.masked_where(
qual < min_quality,
values[:, level]
)
if closest_to_surface:
# flatten this 2D field by taken the closest valid measurement to
# surface
indices = numpy.ma.notmasked_edges(values, axis=1)[0]
tmp = numpy.ma.masked_all((nbmatchups,), values.dtype)
tmp[indices[0]] = values[numpy.ma.notmasked_edges(values, axis=1)[0]]
values = tmp
# @TODO add depth test
else:
# single dimension field
if len(matchup_file.variables[field].dimensions) == 1:
values = matchup_file.variables[field][:]
else:
# no depth dimension but history dimension is there : select
# central pixel
tmp = matchup_file.variables[field][:]
values = numpy.ma.masked_all((nbmatchups,), tmp.dtype)
try:
values[valid] = tmp[
valid,
center_indices[valid]
]
except:
print(valid)
print(center_indices)
raise
return numpy.ma.fix_invalid(values)
[docs] def read_insitu_data(self, source, day, fields, min_quality=None,
closest_to_surface=True, end=None):
"""
Args:
source (str): the site collections (cmems_drifter, cmems_argo, etc...)
day (datetime): the date (or start date if end_date is also set) for
which to retrieve the matchup in situ values
matchup_root (str): the path to the root repository where the matchup
files are stored.
fields (list): a list of fields to read (remove the level suffix:
for instance, use cmems_water_temperature instead of
cmems_water_temperature_0 and the function will detect and
reconstruct the two dimensional fields for you).
Returns:
a dict where keys are field names, and values the retrieved data
"""
if len(fields) == 0:
return OrderedDict([])
dates = [day]
if end is not None:
date = day + datetime.timedelta(days=1)
while date <= end:
dates.append(date)
date += datetime.timedelta(days=1)
# if not closest_to_surface:
# raise NotImplementedError
result = OrderedDict([])
# get matchup files
mdbfiles = self.get_matchup_files(dates, source)
# if no files found, return empty arrays
if len(mdbfiles) == 0:
for field in fields:
result[field] = numpy.ma.array(())
return result
# concatenate all files for the same source and dataset
for mdbfname in mdbfiles:
msg = "File: {}".format(mdbfname)
logging.info(msg)
print(msg)
mdbf = netcdf.Dataset(mdbfname)
# read in situ data
for var in fields:
varname = "{}__{}".format(source, var)
data = self.read_insitu_field(
mdbf,
source,
varname,
min_quality=min_quality,
closest_to_surface=closest_to_surface
)
if var in result:
result[var] = numpy.ma.concatenate((result[var], data))
else:
result[var] = numpy.ma.array(data)
mdbf.close()
return result
[docs] @classmethod
def reduce(cls, dynamic_data, matched_data, keep):
"""
Reduce the data arrays with respect to a selection filter
Args:
dynamic_data(dict): the dictionary of data fields from the
dynamic site source (in situ data)
matched_data (dict):the dictionnary of the fields of the matched
datasets
keep (arr): the selection of match-up to keep. One dimensional
array where True indicates the match-ups to keep, False the
match-ups to remove.
"""
if dynamic_data == {} or dynamic_data[dynamic_data.keys()[0]].size == keep.size:
logging.info(
"Reducing the data fields. Removing masked match-ups."
)
for field in dynamic_data:
dynamic_data[field] = dynamic_data[field][keep]
for dataset in matched_data:
for field in matched_data[dataset]:
matched_data[dataset][field] = \
matched_data[dataset][field][keep,...]
else:
raise ValueError(
"The selection filter has a wrong number of elements"
)