"""
.. module::s3mdbreader.slstrreader
Specific functions for felyx S3 SLSTR match-up database handling
:copyright: Copyright 2016 Ifremer / Cersat.
:license: Released under GPL v3 license, see :ref:`license`.
.. sectionauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
.. codeauthor:: Jeff Piolle <jfpiolle@ifremer.fr>
"""
import logging
import datetime
import os
from collections import OrderedDict
import numpy
import netCDF4 as netcdf
from pyfelyx.mdb import MDB
import s3analysis.slstr.cloud
import s3analysis.slstr.quality_level
# recommended cloud mask
DEFAULT_CLOUDMASK = [
'visible',
'1.37_threshold',
'1.6_large_histogram',
'2.25_large_histogram',
'gross_cloud',
'thin_cirrus',
'medium_high',
'11_12_view_difference',
'3.7_11_view_difference',
'fog_low_stratus',
'11_spatial_coherence',
]
# labels for the different algorithm selection criteria
ALGO_LABEL = [
'N 2-ch day', 'N 2-ch night', 'N 3-ch night',
'D 2-ch day', 'D 2-ch night', 'D 3-ch night'
]
ALGO_CODE = ['N2d', 'N2n', 'N3n', 'D2d', 'D2n', 'D3n']
[docs]class SLSTRMDB(MDB):
"""
An abstraction class to read the SLSTR MDB records. It can be initialized
in three different exclusive ways.
Args:
identifier (str, optional): configuration identifier. If provided, a
configuration file named <identifier>.cfg must exist in your
$HOME/.s3analysis/mdb directory.
cfgfile (str, optional): explicit path to configuration file (exclusive
with identifier)
mdbroot (str, optional): the path to the root directory where the
match-up files are stored (with YYYY/DDD subfolders)
"""
[docs] def load_config(self, identifier):
config = super(SLSTRMDB, self).load_config(identifier)
# default cloud mask
try:
self.cloud_mask = config.options('cloud_mask')
except:
self.cloud_mask = DEFAULT_CLOUDMASK
# default prefixes for each dataset contained in the match-up files
self.datasets = {k: v for k, v in config.items('full_identifier')}
return config
[docs] def __str__(self, *args, **kwargs):
return "\n".join([
super(SLSTRMDB, self).__str__(*args, **kwargs),
"cloud mask : {}".format(self.cloud_mask)
])
[docs] def quality_level(self, matchups):
"""Return an experimental quality level"""
for field in ['sst_algorithm_type',
'l2p_flags',
'sst_theoretical_uncertainty',
'dt_analysis']:
if field not in matchups['WST']:
raise ValueError(
"missing field {} in matchups"
.format(field)
)
for field in ['cloud_in', "cloud_io"]:
if field not in matchups['WCT']:
raise ValueError(
"missing field {} in matchups"
.format(field)
)
# compute WST cloud mask
algo = matchups['WST']['sst_algorithm_type']
composite_mask = numpy.ma.masked_all(
matchups['WCT']["cloud_in"].shape,
matchups['WCT']["cloud_in"].dtype)
composite_mask[algo < 4] = matchups['WCT']["cloud_in"][algo < 4]
composite_mask[algo >= 4] = matchups['WCT']["cloud_io"][algo >= 4]
new_cloud_mask = s3analysis.slstr.cloud.cloud_mask(
composite_mask,
bits=s3analysis.slstr.cloud.WST_MASK
)
new_quality_level = s3analysis.slstr.quality_level.quality_level(
matchups['WST']['l2p_flags'],
new_cloud_mask,
algo,
matchups['WST']['sst_theoretical_uncertainty'],
matchups['WST']['dt_analysis'],
matchups['WST']['sea_surface_temperature'],
)
return new_quality_level
[docs] def read_cloud_mask(source, day, matchup_root,
end_day=None, box=False,
max_distance=None,
maskvar='S3A_SL_2_WCT__cloud_in',
maskbits=None,
multiple_mask=False):
"""Return a cloud mask from 'S3A_SL_2_WCT__cloud_in' flag
Args:
maskvar (str): the bitmask field name (ex: 'S3A_SL_2_WCT__cloud_in')
maskbits (list, optional): list of bits to test for cloud mask
extraction, if you don't want to use default values (preset for
'S3A_SL_2_WCT__cloud_in' variable), as a list of flag meanings.
multiple_mask (boolean, optional): if False (default), return a single
mask array. If True, return a mask array for each mask bit within
a dictionary.
Return:
numpy.ndarray or dict of numpy.ndarray: an array of boolean (True is
set when cloudy) if ``multiple_mask`` keyword is not set, or a
dictionary of boolean arrays (one for each mask bit testes).
"""
cloud_flag = read_satellite_data(source, day, matchup_root,
[maskvar],
end_day=end_day, box=box,
max_distance=max_distance)
if len(cloud_flag[maskvar]) == 0:
return numpy.ma.masked_all(())
# get cloud flag metadata
mdbfiles = get_matchup_files([day], matchup_root, source)
mdbf = netcdf.Dataset(mdbfiles[0])
cloudvar = mdbf.variables[maskvar]
allmeanings = cloudvar.getncattr('flag_meanings')
allmasks = cloudvar.getncattr('flag_masks')
if not multiple_mask:
return cloud_mask(cloud_flag[maskvar], allmeanings, allmasks, maskbits)
else:
bits = maskbits
if bits is None:
bits = DEFAULT_CLOUDMASK
cloud_mask_dict = OrderedDict([])
for bit in bits:
cloud_mask_dict[bit] = cloud_mask(
cloud_flag[maskvar],
allmeanings,
allmasks,
[bit])
return cloud_mask_dict
[docs] def check_fields(self, matchups, fields):
"""check that all required fields are in the matchup set"""
for dataset in fields:
for fieldname in fields[dataset]:
if fieldname not in matchups[dataset]:
raise ValueError(
"field {} is missing in the match-up set for {}"
.format(fieldname, dataset)
)
[docs] def add_wind_speed(self, matchups):
"""Calculate and add the model wind speed"""
self.check_fields(matchups,
{"WCT": ["t_series",
"u_wind_tx",
"v_wind_tx"]})
# select wind at closest forecast time
i = numpy.ma.abs(matchups['WCT']['t_series']).argmin(axis=1)
wind_speed = numpy.ma.sqrt(
matchups['WCT']['u_wind_tx'][numpy.arange(len(i)), i]**2 +
matchups['WCT']['v_wind_tx'][numpy.arange(len(i)), i]**2
)
matchups['WCT']['wind_tx'] = wind_speed
return matchups
[docs] def get_wct_selection(self,
matchups,
across_track_range=[0,1500],
min_wind_speed=6.0,
max_sat_zen=55.,
dt_thresh=5.
):
"""return a selection of the match-ups in the proper range of
validity.
Default settings as provided by G.Corlett
"""
self.check_fields(matchups,
{"WCT": ["solar_zenith_tn",
"dynamic_target_center_index"]})
# select wind at closest forecast time
if 'wind_tx' not in matchups['WCT']:
matchups = self.add_wind_speed(matchups)
# fix warning with nan
matchups['WCT']["sat_zenith_tn"] = numpy.ma.fix_invalid(matchups['WCT']["sat_zenith_tn"])
return (
(matchups['WCT']["dynamic_target_center_index"][:, 1] >= across_track_range[0]) &
(matchups['WCT']["dynamic_target_center_index"][:, 1] <= across_track_range[1]) &
(matchups['WCT']["wind_tx"] > min_wind_speed) &
(matchups['WCT']["sat_zenith_tn"] <= max_sat_zen)
)
[docs] def get_wst_selection(self,
matchups,
insitu,
dt_thresh=5.,
min_quality_level=2
):
"""return a selection of the match-ups in the proper range of
validity.
Default settings as provided by G.Corlett
"""
self.check_fields(matchups,
{'WST': [
'quality_level',
'sst_theoretical_uncertainty',
'dt_analysis'
]
})
return (
(~numpy.ma.getmaskarray(matchups['WST']["sst_theoretical_uncertainty"])) &
(matchups['WST']["quality_level"] >= min_quality_level) &
(numpy.ma.fabs(matchups['WST']["dt_analysis"]) <= dt_thresh) &
(~numpy.ma.getmaskarray(insitu["water_temperature"]))
)
[docs] def get_algo_type_masks(self, matchups, use_wst_filter=True):
"""return a selection mask for each algorithm type
Default settings as provided by G.Corlett
Args:
dt_threshold (float): maximum deviation to analysis
"""
self.check_fields(matchups,
{"WCT": [
"solar_zenith_tn",
"N2_sea_surface_temperature",
"N3_sea_surface_temperature",
"D2_sea_surface_temperature",
"D3_sea_surface_temperature"
],
'WST': [
'sst_algorithm_type',
'sst_theoretical_uncertainty',
]
})
# fix warning with nan
matchups['WCT']["solar_zenith_tn"] = numpy.ma.fix_invalid(
matchups['WCT']["solar_zenith_tn"]
)
n2day = (
(matchups['WCT']["solar_zenith_tn"] < 90) &
~numpy.ma.getmaskarray(matchups['WCT']["N2_sea_surface_temperature"])
)
if use_wst_filter:
n2day = n2day & (matchups['WST']["sst_algorithm_type"] == 1)
n2night = (
(matchups['WCT']["solar_zenith_tn"] > 90) &
~numpy.ma.getmaskarray(matchups['WCT']["N2_sea_surface_temperature"])
)
if use_wst_filter:
n2night = n2night & (matchups['WST']["sst_algorithm_type"] == 1)
n3night = (
(matchups['WCT']["solar_zenith_tn"] > 90) &
~numpy.ma.getmaskarray(matchups['WCT']["N3_sea_surface_temperature"])
)
if use_wst_filter:
n3night = n3night & (matchups['WST']["sst_algorithm_type"] == 3)
d2day = (
(matchups['WCT']["solar_zenith_tn"] < 90) &
~numpy.ma.getmaskarray(matchups['WCT']["D2_sea_surface_temperature"])
)
if use_wst_filter:
d2day = d2day & (matchups['WST']["sst_algorithm_type"] == 4)
d2night = (
(matchups['WCT']["solar_zenith_tn"] > 90) &
~numpy.ma.getmaskarray(matchups['WCT']["D2_sea_surface_temperature"])
)
if use_wst_filter:
d2night = d2night & (matchups['WST']["sst_algorithm_type"] == 4)
d3night = (
(matchups['WCT']["solar_zenith_tn"] > 90) &
~numpy.ma.getmaskarray(matchups['WCT']["D3_sea_surface_temperature"])
)
if use_wst_filter:
d3night = d3night & (matchups['WST']["sst_algorithm_type"] == 5)
return n2day, n2night, n3night, d2day, d2night, d3night
[docs] def read_aux_data(self, source, day, fields, end_day=None, filters=None):
"""
Read the auxiliary files generated by Gary Corlett
Args:
source (str): the site collections (cmems_drifter, cmems_argo, etc...)
day (datetime): the date (or start date if end_date is also set) for
which to retrieve the matchup in situ values
matchup_root (str): the path to the root repository where the matchup
files are stored.
fields (list): a list of fields to read (remove the level suffix:
for instance, use cmems_water_temperature instead of
cmems_water_temperature_0 and the function will detect and
reconstruct the two dimensional fields for you).
Returns:
a dict where keys are field names, and values the retrieved data
"""
if filters is None:
filters = self.default_filters
dates = [day]
if end_day is not None:
date = day + datetime.timedelta(days=1)
while date <= end_day:
dates.append(date)
date += datetime.timedelta(days=1)
result = OrderedDict([])
# get matchup files
mdbfiles = self.get_matchup_files(dates, self.root, source)
# get auxiliary matchup files
prefix = "%s_AUX" % source
auxfiles = self.get_matchup_files(dates, self.root, prefix)
# if no files found, return empty arrays
if len(mdbfiles) == 0:
for field in fields:
result[field] = numpy.ma.array(())
return result
# concatenate all files for the same source and dataset
for i, mdbfname in enumerate(mdbfiles):
msg = "File: {}".format(mdbfname)
logging.info(msg)
# check mdb and aux file cover the same time period
if (os.path.basename(mdbfname).split('_')[-1][0:16] !=
os.path.basename(auxfiles[i]).split('_')[-1][0:16]):
raise IOError("mdb and aux file don't match: {} <> {}"
.format(os.path.basename(mdbfname),
os.path.basename(auxfiles[i])))
mdbf = netcdf.Dataset(mdbfname)
# read the center pixel coordinates
lat = mdbf.variables['lat'][:]
lon = mdbf.variables['lon'][:]
auxf = netcdf.Dataset(auxfiles[i])
# read in situ data
for var in fields:
# get indice of closest pixel to in situ, for each dataset
indices, distances = self.get_closest_valid_pixel_indices(
mdbf,
self.reference,
lat=lat,
lon=lon,
filters=filters
)
i, j = indices
data = self.read_insitu_field(
mdbf,
source,
var,
min_quality=min_quality,
closest_to_surface=closest_to_surface
)
if var in result:
result[var] = numpy.ma.concatenate((result[var], data))
else:
result[var] = numpy.ma.array(data)
mdbf.close()
return result