#!/usr/bin/env python
# Copyright (c) 2011-2023, wradlib developers.
# Distributed under the MIT License. See LICENSE.txt for more info.
"""
Xarray based Data I/O (deprecated)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Reads data from netcdf-based CfRadial1, CfRadial2 and hdf5-based ODIM_H5 and
other hdf5-flavours (GAMIC).
This reader implementation uses
* `xarray <https://xarray.pydata.org/>`_,
* `netcdf4 <https://unidata.github.io/netcdf4-python/>`_,
* `h5py <https://www.h5py.org/>`_ and
* `h5netcdf <https://github.com/h5netcdf/h5netcdf>`_.
The following two approaches are not recommended and will be removed from the codebase
in wradlib v 2.0.0.
In the first approach the data is claimed using netcdf4-Dataset in a diskless
non-persistent mode as follows::
vol = io.xarray.OdimH5(ncfile)
# or
vol = io.xarray.CfRadial(ncfile)
The vol data structure holds one or many ['sweep_X'] xarray datasets, containing the
sweep data. The root group xarray dataset which corresponds to the
CfRadial2 root-group is available via the `.root`-object.
The writer implementation uses xarray for CfRadial2 output and relies on h5py
for the ODIM_H5 output.
The second approach reads ODIM files (metadata) into a *simple* accessible
structure::
vol = wradlib.io.open_odim(paths, loader='netcdf4', **kwargs)
All datafiles are accessed via the given loader ('netcdf4', 'h5py',
'h5netcdf'). Only absolutely necessary data is actually read in this process,
eg. acquisition time and elevation, to fill the structure accordingly. All
subsequent metadata retrievals are cached to further improve performance.
Actual data access is realised via xarray using engine 'netcdf4' or 'h5netcdf',
depending on the loader.
Since for data handling xarray is utilized all xarray features can be
exploited, like lazy-loading, pandas-like indexing on N-dimensional data and
vectorized mathematical operations across multiple dimensions.
Examples
--------
See :ref:`/notebooks/fileio/wradlib_odim_multi_file_dataset.ipynb`.
Warning
-------
This implementation is considered experimental. Changes in the API should
be expected.
.. autosummary::
:nosignatures:
:toctree: generated/
{}
"""
__all__ = [
"XRadVol",
"CfRadial",
"OdimH5",
"open_odim",
"XRadSweep",
"XRadMoment",
"XRadTimeSeries",
"XRadVolume",
"create_xarray_dataarray",
]
__doc__ = __doc__.format("\n ".join(__all__))
import collections
import datetime as dt
import glob
import os
import warnings
import dateutil
import deprecation
import h5netcdf
import h5py
import netCDF4
import numpy as np
import xarray as xr
from packaging.version import Version
from xarray.backends.api import combine_by_coords
from xradar.io.backends.gamic import gamic_mapping
from xradar.model import (
get_azimuth_attrs,
get_elevation_attrs,
get_range_attrs,
get_time_attrs,
moment_attrs,
)
from xradar.model import sweep_vars_mapping as moments_mapping
from wradlib import version
from wradlib.georef import xarray as geo_xarray
from wradlib.io.xarray import (
XRadBase,
to_netcdf,
)
from wradlib.util import has_import
try:
from tqdm.auto import tqdm
except ImportError:
def tqdm(val, **kwargs):
print(
"wradlib: Please wait for completion of time consuming task! \n"
"wradlib: Please install 'tqdm' for showing a progress bar "
"instead."
)
return val
root_vars = {
"volume_number",
"platform_type",
"instrument_type",
"primary_axis",
"time_coverage_start",
"time_coverage_end",
"latitude",
"longitude",
"altitude",
"fixed_angle",
"status_xml",
}
sweep_vars1 = {
"sweep_number",
"sweep_mode",
"polarization_mode",
"prt_mode",
"follow_mode",
"fixed_angle",
"target_scan_rate",
"sweep_start_ray_index",
"sweep_end_ray_index",
}
sweep_vars2 = {
"azimuth",
"elevation",
"pulse_width",
"prt",
"nyquist_velocity",
"unambiguous_range",
"antenna_transition",
"n_samples",
"r_calib_index",
"scan_rate",
}
sweep_vars3 = {
"DBZH",
"DBZV",
"VELH",
"VELV",
"DBZ",
"VR",
"time",
"range",
"reflectivity_horizontal",
}
cf_full_vars = {"prt": "prf", "n_samples": "pulse"}
global_attrs = [
("Conventions", "Cf/Radial"),
("version", "Cf/Radial version number"),
("title", "short description of file contents"),
("institution", "where the original data were produced"),
(
"references",
("references that describe the data or the methods used " "to produce it"),
),
("source", "method of production of the original data"),
("history", "list of modifications to the original data"),
("comment", "miscellaneous information"),
("instrument_name", "nameThe of radar or lidar"),
("site_name", "name of site where data were gathered"),
("scan_name", "name of scan strategy used, if applicable"),
("scan_id", "scan strategy id, if applicable. assumed 0 if missing"),
("platform_is_mobile", '"true" or "false", assumed "false" if missing'),
(
"ray_times_increase",
(
'"true" or "false", assumed "true" if missing. '
"This is set to true if ray times increase monotonically "
"thoughout all of the sweeps in the volume"
),
),
("field_names", "array of strings of field names present in this file."),
("time_coverage_start", "copy of time_coverage_start global variable"),
("time_coverage_end", "copy of time_coverage_end global variable"),
(
"simulated data",
(
'"true" or "false", assumed "false" if missing. '
"data in this file are simulated"
),
),
]
global_variables = dict(
[
("volume_number", np.int_),
("platform_type", "fixed"),
("instrument_type", "radar"),
("primary_axis", "axis_z"),
("time_coverage_start", "1970-01-01T00:00:00Z"),
("time_coverage_end", "1970-01-01T00:00:00Z"),
("latitude", np.nan),
("longitude", np.nan),
("altitude", np.nan),
("altitude_agl", np.nan),
("sweep_group_name", (["sweep"], [np.nan])),
("sweep_fixed_angle", (["sweep"], [np.nan])),
("frequency", np.nan),
("status_xml", "None"),
]
)
ODIM_NAMES = {value["short_name"]: key for (key, value) in moments_mapping.items()}
def _maybe_decode(attr):
try:
return attr.item().decode()
except AttributeError:
return attr
def _remove_duplicate_rays(ds, store=None):
dimname = list(ds.dims)[0]
# find exact duplicates and remove
_, idx = np.unique(ds[dimname], return_index=True)
if len(idx) < len(ds[dimname]):
ds = ds.isel({dimname: idx})
# if ray_time was erroneously created from wrong dimensions
# we need to recalculate it
if store and store._need_time_recalc:
ray_times = store._get_ray_times(nrays=len(idx))
# need to decode only if ds is decoded
if "units" in ds.rtime.encoding:
ray_times = xr.decode_cf(xr.Dataset({"rtime": ray_times})).rtime
ds = ds.assign({"rtime": ray_times})
return ds
def _fix_angle(da):
# fix elevation outliers
if len(set(da.values)) > 1:
med = da.median(skipna=True)
da = da.where(da == med).fillna(med)
return da
[docs]@deprecation.deprecated(
deprecated_in="1.5",
removed_in="2.0",
current_version=version.version,
details="Use `wradlib.georef.create_xarray_dataarray` " "instead.",
)
def create_xarray_dataarray(*args, **kwargs):
return geo_xarray.create_xarray_dataarray(*args, **kwargs)
def _reindex_angle(ds, store=None, force=False, tol=None):
# Todo: The current code assumes to have PPI's of 360deg and RHI's of 90deg,
# make this work also for sectorized (!!!) measurements
# this needs refactoring, it's too complex
if tol is True or tol is None:
tol = 0.4
# disentangle different functionality
full_range = {"azimuth": 360, "elevation": 90}
dimname = list(ds.dims)[0]
# sort in any case, to prevent unsorted errors
ds = ds.sortby(dimname)
# fix angle range for rhi
if hasattr(ds, "elevation_upper_limit"):
ul = np.rint(ds.elevation_upper_limit)
full_range["elevation"] = ul
secname = {"azimuth": "elevation", "elevation": "azimuth"}.get(dimname)
dim = ds[dimname]
diff = dim.diff(dimname)
# this captures different angle spacing
# catches also missing rays and double rays
# and other erroneous ray alignments which result in different diff values
diffset = set(diff.values)
non_uniform_angle_spacing = len(diffset) > 1
# this captures missing and additional rays in case the angle differences
# are equal
non_full_circle = False
if not non_uniform_angle_spacing:
res = list(diffset)[0]
non_full_circle = ((res * ds.dims[dimname]) % full_range[dimname]) != 0
# fix issues with ray alignment
if force | non_uniform_angle_spacing | non_full_circle:
# create new array and reindex
if store and hasattr(store, "angle_resolution"):
res = store.angle_resolution
elif hasattr(ds[dimname], "angle_res"):
res = ds[dimname].angle_res
else:
res = diff.median(dimname).values
new_rays = int(np.round(full_range[dimname] / res, decimals=0))
# find exact duplicates and remove
ds = _remove_duplicate_rays(ds, store=store)
# do we have all needed rays?
if non_uniform_angle_spacing | len(ds[dimname]) != new_rays:
# todo: check if assumption that beam center points to
# multiples of res/2. is correct in any case
# it might fail for cfradial1 data which already points to beam centers
azr = np.arange(res / 2.0, new_rays * res, res, dtype=diff.dtype)
fill_value = {
k: np.asarray(v._FillValue).astype(v.dtype)
for k, v in ds.items()
if hasattr(v, "_FillValue")
}
ds = ds.reindex(
{dimname: azr},
method="nearest",
tolerance=tol,
fill_value=fill_value,
)
# check other coordinates
# check secondary angle coordinate (no nan)
# set nan values to reasonable median
if hasattr(ds, secname) and np.count_nonzero(np.isnan(ds[secname])):
ds[secname] = ds[secname].fillna(ds[secname].median(skipna=True))
# todo: rtime is also affected, might need to be treated accordingly
return ds
@xr.register_dataset_accessor("gamic")
class GamicAccessor:
"""Dataset Accessor for handling GAMIC HDF5 data files"""
def __init__(self, xarray_obj):
self._obj = xarray_obj
self._radial_range = None
self._azimuth_range = None
self._elevation_range = None
self._time_range = None
self._sitecoords = None
self._polcoords = None
self._projection = None
self._time = None
@property
def radial_range(self):
"""Return the radial range of this dataset."""
if self._radial_range is None:
ngates = self._obj.attrs["bin_count"]
range_samples = self._obj.attrs["range_samples"]
range_step = self._obj.attrs["range_step"]
bin_range = range_step * range_samples
range_data = np.arange(
bin_range / 2.0, bin_range * ngates, bin_range, dtype="float32"
)
range_attrs = get_range_attrs(range_data)
da = xr.DataArray(range_data, dims=["dim_1"], attrs=range_attrs)
self._radial_range = da
return self._radial_range
@property
def azimuth_range(self):
"""Return the azimuth range of this dataset."""
if self._azimuth_range is None:
azstart = self._obj["azimuth_start"]
azstop = self._obj["azimuth_stop"]
zero_index = np.where(azstop < azstart)
azstop[zero_index[0]] += 360
azimuth = (azstart + azstop) / 2.0
azimuth = azimuth.assign_attrs(get_azimuth_attrs())
self._azimuth_range = azimuth
return self._azimuth_range
@property
def elevation_range(self):
"""Return the elevation range of this dataset."""
if self._elevation_range is None:
elstart = self._obj["elevation_start"]
elstop = self._obj["elevation_stop"]
elevation = (elstart + elstop) / 2.0
elevation = elevation.assign_attrs(get_elevation_attrs())
self._elevation_range = elevation
return self._elevation_range
@property
def time_range(self):
"""Return the time range of this dataset."""
if self._time_range is None:
times = self._obj["timestamp"] / 1e6
attrs = {
"units": "seconds since 1970-01-01T00:00:00Z",
"standard_name": "time",
}
da = xr.DataArray(times, attrs=attrs)
self._time_range = da
return self._time_range
@xr.register_dataset_accessor("odim")
class OdimAccessor:
"""Dataset Accessor for handling ODIM_H5 data files"""
def __init__(self, xarray_obj):
self._obj = xarray_obj
self._radial_range = None
self._azimuth_range = None
self._elevation_range = None
self._time_range = None
self._time_range2 = None
self._prt = None
self._n_samples = None
@property
def radial_range(self):
"""Return the radial range of this dataset."""
if self._radial_range is None:
ngates = self._obj.attrs["nbins"]
range_start = self._obj.attrs["rstart"] * 1000.0
bin_range = self._obj.attrs["rscale"]
cent_first = range_start + bin_range / 2.0
range_data = np.arange(
cent_first, range_start + bin_range * ngates, bin_range, dtype="float32"
)
range_attrs = get_range_attrs(range_data)
da = xr.DataArray(range_data, dims=["dim_1"], attrs=range_attrs)
self._radial_range = da
return self._radial_range
@property
def azimuth_range2(self):
"""Return the azimuth range of this dataset."""
if self._azimuth_range is None:
nrays = self._obj.attrs["nrays"]
res = 360.0 / nrays
azimuth_data = np.arange(res / 2.0, 360.0, res, dtype="float32")
da = xr.DataArray(azimuth_data, dims=["dim_0"], attrs=get_azimuth_attrs())
self._azimuth_range = da
return self._azimuth_range
@property
def azimuth_range(self):
"""Return the azimuth range of this dataset."""
if self._azimuth_range is None:
startaz = self._obj.attrs["startazA"]
stopaz = self._obj.attrs["stopazA"]
zero_index = np.where(stopaz < startaz)
stopaz[zero_index[0]] += 360
azimuth_data = (startaz + stopaz) / 2.0
da = xr.DataArray(azimuth_data, attrs=get_azimuth_attrs())
self._azimuth_range = da
return self._azimuth_range
@property
def elevation_range2(self):
"""Return the elevation range of this dataset."""
if self._elevation_range is None:
nrays = self._obj.attrs["nrays"]
elangle = self._obj.attrs["elangle"]
elevation_data = np.ones(nrays, dtype="float32") * elangle
da = xr.DataArray(
elevation_data, dims=["dim_0"], attrs=get_elevation_attrs()
)
self._elevation_range = da
return self._elevation_range
@property
def elevation_range(self):
"""Return the elevation range of this dataset."""
if self._elevation_range is None:
startel = self._obj.attrs["startelA"]
stopel = self._obj.attrs["stopelA"]
elevation_data = (startel + stopel) / 2.0
da = xr.DataArray(
elevation_data, dims=["dim_0"], attrs=get_elevation_attrs()
)
self._elevation_range = da
return self._elevation_range
@property
def time_range(self):
"""Return the time range of this dataset."""
if self._time_range is None:
startT = self._obj.attrs["startazT"]
stopT = self._obj.attrs["stopazT"]
times = (startT + stopT) / 2.0
self._time_range = times
return self._time_range
@property
def time_range2(self):
"""Return the time range of this dataset."""
if self._time_range2 is None:
startdate = self._obj.attrs["startdate"]
starttime = self._obj.attrs["starttime"]
enddate = self._obj.attrs["enddate"]
endtime = self._obj.attrs["endtime"]
start = dt.datetime.strptime(startdate + starttime, "%Y%m%d%H%M%S")
end = dt.datetime.strptime(enddate + endtime, "%Y%m%d%H%M%S")
start = start.replace(tzinfo=dt.timezone.utc)
end = end.replace(tzinfo=dt.timezone.utc)
self._time_range2 = (start.timestamp(), end.timestamp())
return self._time_range2
@property
def prt(self):
if self._prt is None:
try:
prt = 1.0 / self._obj.attrs["prf"]
da = xr.DataArray(prt, dims=["dim_0"])
self._prt = da
except KeyError:
pass
return self._prt
@property
def n_samples(self):
if self._n_samples is None:
try:
da = xr.DataArray(self._obj.attrs["pulse"], dims=["dim_0"])
self._n_samples = da
except KeyError:
pass
return self._n_samples
def to_cfradial2(volume, filename, timestep=None, engine=None):
"""Save RadarVolume/XRadVol/XRadVolume to CfRadial2.0 compliant file.
Parameters
----------
volume : :class:`wradlib.io.xarray.RadarVolume`, :class:`wradlib.io.xarray.XRadVol` or :class:`wradlib.io.xarray.XRadVolume`
filename : str
output filename
timestep : int
timestep of wanted volume
"""
if engine is None:
if has_import(netCDF4):
engine == "netcdf4"
elif has_import(h5netcdf):
engine == "h5netcdf"
else:
raise ImportError(
"wradlib: ``netCDF4`` or ``h5netcdf`` needed to perform this operation."
)
volume.root.load()
root = volume.root.copy(deep=True)
root.attrs["Conventions"] = "Cf/Radial"
root.attrs["version"] = "2.0"
root.to_netcdf(filename, mode="w", group="/", engine=engine)
for idx, key in enumerate(root.sweep_group_name.values):
if isinstance(volume, (OdimH5, CfRadial)):
swp = volume[key]
elif isinstance(volume, XRadVolume):
swp = volume[idx][timestep].data
else:
ds = volume[idx]
if "time" not in ds.dims:
ds = ds.expand_dims("time")
swp = ds.isel(time=timestep)
swp.load()
dim0 = list(set(swp.dims) & {"azimuth", "elevation", "time"})[0]
try:
swp = swp.swap_dims({dim0: "time"})
except ValueError:
swp = swp.drop_vars("time").rename({"rtime": "time"})
swp = swp.swap_dims({dim0: "time"})
swp = swp.drop_vars(["x", "y", "z", "gr", "rays", "bins"], errors="ignore")
swp = swp.sortby("time")
swp.to_netcdf(filename, mode="a", group=key, engine=engine)
def to_odim(volume, filename, timestep=0):
"""Save RadarVolume/XRadVol/XRadVolume to ODIM_H5/V2_2 compliant file.
Parameters
----------
volume : :class:`wradlib.io.xarray.RadarVolume`, :class:`wradlib.io.xarray.XRadVol` or :class:`wradlib.io.xarray.XRadVolume`
filename : str
output filename
timestep : int
timestep of wanted volume
"""
root = volume.root
h5 = h5py.File(filename, "w")
# root group, only Conventions for ODIM_H5
_write_odim({"Conventions": "ODIM_H5/V2_2"}, h5)
# how group
how = {}
how.update({"_modification_program": "wradlib"})
h5_how = h5.create_group("how")
_write_odim(how, h5_how)
sweepnames = root.sweep_group_name.values
# what group, object, version, date, time, source, mandatory
# p. 10 f
what = {}
if len(sweepnames) > 1:
what["object"] = "PVOL"
else:
what["object"] = "SCAN"
what["version"] = "H5rad 2.2"
what["date"] = str(root.time_coverage_start.values)[:10].replace("-", "")
what["time"] = str(root.time_coverage_end.values)[11:19].replace(":", "")
what["source"] = root.attrs["instrument_name"]
h5_what = h5.create_group("what")
_write_odim(what, h5_what)
# where group, lon, lat, height, mandatory
where = {
"lon": root.longitude.values,
"lat": root.latitude.values,
"height": root.altitude.values,
}
h5_where = h5.create_group("where")
_write_odim(where, h5_where)
# datasets
ds_list = [f"dataset{i + 1}" for i in range(len(sweepnames))]
ds_idx = np.argsort(ds_list)
for idx in ds_idx:
if isinstance(volume, (OdimH5, CfRadial)):
ds = volume[f"sweep_{idx + 1}"]
elif isinstance(volume, XRadVolume):
ds = volume[idx][timestep].data
ds = ds.drop_vars("time", errors="ignore").rename({"rtime": "time"})
else:
ds = volume[idx]
if "time" not in ds.dims:
ds = ds.expand_dims("time")
ds = ds.isel(time=timestep, drop=True)
ds = ds.drop_vars("time", errors="ignore").rename({"rtime": "time"})
h5_dataset = h5.create_group(ds_list[idx])
# what group p. 21 ff.
h5_ds_what = h5_dataset.create_group("what")
ds_what = {}
# skip NaT values
valid_times = ~np.isnat(ds.time.values)
t = sorted(ds.time.values[valid_times])
start = dt.datetime.utcfromtimestamp(np.rint(t[0].astype("O") / 1e9))
end = dt.datetime.utcfromtimestamp(np.rint(t[-1].astype("O") / 1e9))
ds_what["product"] = "SCAN"
ds_what["startdate"] = start.strftime("%Y%m%d")
ds_what["starttime"] = start.strftime("%H%M%S")
ds_what["enddate"] = end.strftime("%Y%m%d")
ds_what["endtime"] = end.strftime("%H%M%S")
_write_odim(ds_what, h5_ds_what)
# where group, p. 11 ff. mandatory
h5_ds_where = h5_dataset.create_group("where")
rscale = ds.range.values[1] / 1.0 - ds.range.values[0]
rstart = (ds.range.values[0] - rscale / 2.0) / 1000.0
# todo: make this work for RHI's
a1gate = np.argsort(ds.sortby("azimuth").time.values)[0]
try:
fixed_angle = ds.fixed_angle
except AttributeError:
fixed_angle = ds.elevation.round(decimals=1).median().values
ds_where = {
"elangle": fixed_angle,
"nbins": ds.range.shape[0],
"rstart": rstart,
"rscale": rscale,
"nrays": ds.azimuth.shape[0],
"a1gate": a1gate,
}
_write_odim(ds_where, h5_ds_where)
# how group, p. 14 ff.
h5_ds_how = h5_dataset.create_group("how")
tout = [tx.astype("O") / 1e9 for tx in ds.sortby("azimuth").time.values]
tout_sorted = sorted(tout)
# handle non-uniform times (eg. only second-resolution)
if np.count_nonzero(np.diff(tout_sorted)) < (len(tout_sorted) - 1):
tout = np.roll(
np.linspace(tout_sorted[0], tout_sorted[-1], len(tout)), a1gate
)
tout_sorted = sorted(tout)
difft = np.diff(tout_sorted) / 2.0
difft = np.insert(difft, 0, difft[0])
azout = ds.sortby("azimuth").azimuth
diffa = np.diff(azout) / 2.0
diffa = np.insert(diffa, 0, diffa[0])
elout = ds.sortby("azimuth").elevation
diffe = np.diff(elout) / 2.0
diffe = np.insert(diffe, 0, diffe[0])
try:
sweep_number = ds.sweep_number + 1
except AttributeError:
sweep_number = timestep
ds_how = {
"scan_index": sweep_number,
"scan_count": len(sweepnames),
"startazT": tout - difft,
"stopazT": tout + difft,
"startazA": azout - diffa,
"stopazA": azout + diffa,
"startelA": elout - diffe,
"stopelA": elout + diffe,
}
_write_odim(ds_how, h5_ds_how)
# write moments
_write_odim_dataspace(ds, h5_dataset)
h5.close()
def _preprocess_moment(ds, mom, non_uniform_shape):
attrs = mom._decode(ds.data.attrs)
quantity = mom.quantity
# extract and translate attributes to cf
what = mom.what
attrs["scale_factor"] = what["gain"]
attrs["add_offset"] = what["offset"]
attrs["_FillValue"] = what["nodata"]
attrs["_Undetect"] = what["undetect"]
if mom.parent.decode_coords:
attrs["coordinates"] = "elevation azimuth range"
# handle non-standard moment names
try:
mapping = moments_mapping[quantity]
except KeyError:
pass
else:
attrs.update({key: mapping[key] for key in moment_attrs})
ds["data"] = ds["data"].assign_attrs(attrs)
# fix dimensions
dims = sorted(list(ds.dims.keys()), key=lambda x: int(x[len("phony_dim_") :]))
ds = ds.rename(
{"data": quantity, dims[0]: mom.parent._dim0[0], dims[1]: mom.parent._dim1}
)
# apply coordinates to dataset if source moments have different shapes
# and correct for it
if mom.parent.decode_coords & non_uniform_shape:
coords = mom.parent._get_coords()
ds = ds.assign_coords(coords.coords)
if mom.parent._dim0[0] == "azimuth":
ds = ds.sortby(mom.parent._dim0[0])
ds = ds.pipe(_reindex_angle, mom.parent)
return ds
def _open_mfmoments(
moments,
chunks=None,
compat="no_conflicts",
preprocess=None,
engine=None,
lock=None,
data_vars="all",
coords="minimal",
parallel=False,
**kwargs,
):
"""Open multiple OdimH5 moments as a single dataset.
This is derived from xarray.open_mfdataset [1]
Parameters
----------
moments : sequence
List of XRadSweep objects.
chunks : int or dict, optional
Chunk size
preprocess : callable, optional
If provided, call this function on each dataset prior to concatenation.
engine : {'netcdf4', 'h5netcdf'}, optional
Engine to use when reading files. Defaults to 'netcdf4'.
lock : False or duck threading.Lock, optional
Resource lock to use when reading data from disk. Only relevant when
using dask or another form of parallelism. By default, appropriate
locks are chosen to safely read and write files with the currently
active dask scheduler.
data_vars : {'minimal', 'different', 'all' or list of str}, optional
These data variables will be concatenated together:
* 'minimal': Only data variables in which the dimension already
appears are included.
* 'different': Data variables which are not equal (ignoring
attributes) across all datasets are also concatenated (as well as
all for which dimension already appears). Beware: this option may
load the data payload of data variables into memory if they are not
already loaded.
* 'all': All data variables will be concatenated.
* list of str: The listed data variables will be concatenated, in
addition to the 'minimal' data variables.
parallel : bool, optional
If True, the open and preprocess steps of this function will be
performed in parallel using ``dask.delayed``. Default is False.
**kwargs : dict, optional
Additional arguments passed on to :py:func:`xarray:xarray.open_dataset`.
Returns
-------
xarray.Dataset
References
----------
.. [1] https://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html
""" # noqa
open_kwargs = dict(chunks=chunks, **kwargs)
# if moments are specified in XRadTimeseries only load those
if moments.parent._moments is not None:
moments = [p for p in moments if p.quantity in moments.parent._moments]
engine = moments[0].engine
if engine == "netcdf4":
opener = netCDF4.Dataset
opener_kwargs = {}
store = xr.backends.NetCDF4DataStore
else:
if Version(h5netcdf.__version__) < Version("0.8.0"):
warnings.warn(
f"WRADLIB: 'h5netcdf>=0.8.0' needed to perform this "
f"operation. 'h5netcdf={h5netcdf.__version__} "
f"available.",
UserWarning,
)
return None
if Version(xr.__version__) < Version("0.15.0"):
warnings.warn(
f"WRADLIB: 'xarray>=0.15.0' needed to perform this "
f"operation. 'xarray={xr.__version__} "
f"available.",
UserWarning,
)
return None
opener = h5netcdf.File
opener_kwargs = {"phony_dims": "access"}
store = xr.backends.H5NetCDFStore
# do not use parallel if all moments in one file
if len(set([p.filename for p in moments])) == 1:
single_file = True
if os.path.isfile(moments[0].filename):
ds0 = opener(moments[0].filename, "r", **opener_kwargs)
else:
ds0 = moments[0].ncfile
else:
single_file = False
if parallel:
import dask
# wrap the open_dataset, getattr, and preprocess with delayed
open_ = dask.delayed(xr.open_dataset)
getattr_ = dask.delayed(getattr)
if preprocess is not None:
preprocess = dask.delayed(preprocess)
else:
open_ = xr.open_dataset
getattr_ = getattr
if single_file:
datasets = [open_(store(ds0, group=p.ncpath)) for p in moments]
else:
fileid = []
for p in moments:
if os.path.isfile(p.filename):
p = opener(p.filename, "r", **opener_kwargs)
else:
p = p.ncfile
fileid.append(p)
datasets = [
open_(store(f, group=p.ncpath), **open_kwargs)
for f, p in zip(fileid, moments)
]
if Version(xr.__version__) <= Version("0.16.2"):
closers = [getattr_(ds, "_file_obj") for ds in datasets]
else:
closers = [getattr_(ds, "_close") for ds in datasets]
# check for differences in shape of moments
non_uniform_shape = len(set([tuple(ds.sizes.values()) for ds in datasets])) > 1
if preprocess is not None:
datasets = [
preprocess(ds, mom, non_uniform_shape) for ds, mom in zip(datasets, moments)
]
if parallel:
# calling compute here will return the datasets/file_objs lists,
# the underlying datasets will still be stored as dask arrays
datasets, closers = dask.compute(datasets, closers)
# Combine all datasets, closing them in case of a ValueError
try:
combined = combine_by_coords(
datasets, compat="no_conflicts", data_vars="all", coords="minimal"
)
except ValueError:
for ds in datasets:
ds.close()
raise
def multi_file_closer():
for closer in closers:
closer()
combined.set_close(multi_file_closer)
combined.attrs = datasets[0].attrs
return combined
class OdimH5GroupAttributeMixin:
"""Mixin Class for Odim Group Attribute Retrieval"""
__slots__ = ["_attrs", "_ncfile", "_ncpath", "_parent", "_how", "_what", "_where"]
def __init__(self, ncfile=None, ncpath=None, parent=None):
super().__init__()
self._ncfile = ncfile
self._ncpath = ncpath
self._parent = parent
self._attrs = None
self._how = None
self._what = None
self._where = None
@property
def is_netcdf(self):
return has_import(netCDF4) and isinstance(self.ncfile, netCDF4.Dataset)
@property
def is_h5netcdf(self):
return has_import(h5netcdf) and isinstance(self.ncfile, h5netcdf.File)
@property
def ncpath(self):
"""Returns path string inside HDF5 File."""
return self._ncpath
@property
def ncid(self):
"""Returns handle for current path."""
# root-group can't be subset with netcdf4 and h5netcdf
if self._ncpath == "/":
if self.is_netcdf or self.is_h5netcdf:
return self._ncfile
return self._ncfile[self.ncpath]
@property
def ncfile(self):
"""Returns file handle."""
return self._ncfile
@property
def how(self):
"""Return attributes of `how`-group."""
if self._how is None:
self._how = self._get_attributes("how")
return self._how
@property
def what(self):
"""Return attributes of `what`-group."""
if self._what is None:
self._what = self._get_attributes("what")
return self._what
@property
def where(self):
"""Return attributes of `where`-group."""
if self._where is None:
self._where = self._get_attributes("where")
return self._where
@property
def attrs(self):
"""Return group attributes."""
if self._attrs is None:
if self.is_netcdf:
self._attrs = {k: self.ncid.getncattr(k) for k in self.ncid.ncattrs()}
else:
self._attrs = self._decode({**self.ncid.attrs})
return self._attrs
@property
def filename(self):
"""Return filename group belongs to."""
if self.is_netcdf:
return self.ncfile.filepath()
else:
return self.ncfile.filename
@property
def groups(self):
"""Return list of available groups."""
if self.is_netcdf:
return list(self.ncid.groups)
else:
return list(self.ncid.keys())
@property
def engine(self):
"""Return engine used for accessing data"""
if self.is_netcdf:
return "netcdf4"
else:
return "h5netcdf"
@property
def parent(self):
"""Return parent object."""
return self._parent
def _get_attributes(self, grp, ncid=None):
"""Return dict with attributes extracted from `grp`"""
if ncid is None:
ncid = self.ncid
try:
if self.is_netcdf:
attrs = {k: ncid[grp].getncattr(k) for k in ncid[grp].ncattrs()}
return attrs
else:
attrs = {**ncid[grp].attrs}
attrs = self._decode(attrs)
return attrs
except (IndexError, KeyError):
return None
def _get_attribute(self, grp, attr=None, ncid=None):
"""Return single attribute extracted from `grp`"""
if ncid is None:
ncid = self.ncid
try:
if self.is_netcdf:
return ncid[grp].getncattr(attr)
else:
v = ncid[grp].attrs[attr]
try:
v = v.item()
except (ValueError, AttributeError):
pass
try:
v = v.decode()
except (UnicodeDecodeError, AttributeError):
pass
return v
except (IndexError, KeyError):
return None
def _decode(self, attrs):
"""Decode strings if possible."""
for k, v in attrs.items():
try:
v = v.item()
except (ValueError, AttributeError):
pass
try:
v = v.decode()
except (UnicodeDecodeError, AttributeError):
pass
attrs[k] = v
return attrs
class OdimH5SweepMetaDataMixin:
"""Mixin Class for Odim MetaData."""
def __init__(self):
super().__init__()
self._a1gate = None
self._angle_resolution = None
self._azimuth = None
self._elevation = None
self._fixed_angle = None
self._nrays = None
self._nbins = None
self._time = None
self._endtime = None
self._rtime = None
self._rng = None
@property
def a1gate(self):
"""Return and cache a1gate, azimuth of first measured gate"""
if self._a1gate is None:
self._a1gate = self._get_a1gate()
return self._a1gate
@property
def angle_resolution(self):
"""Return and cache angular resolution in degree."""
if self._angle_resolution is None:
self._angle_resolution = self._get_angle_resolution()
return self._angle_resolution
@property
def azimuth(self):
"""Return and cache azimuth xr.DataArray."""
if self._azimuth is None:
self._azimuth = self._get_azimuth()
return self._azimuth
@property
def elevation(self):
"""Return and cache elevation xr.DataArray."""
if self._elevation is None:
self._elevation = self._get_elevation()
return self._elevation
@property
def fixed_angle(self):
"""Return and cache elevation angle in degree."""
if self._fixed_angle is None:
self._fixed_angle = self._get_fixed_angle()
return self._fixed_angle
@property
def nrays(self):
"""Return and cache number of rays."""
if self._nrays is None:
self._nrays = self._get_nrays()
return self._nrays
@property
def nbins(self):
"""Return and cache number of bins."""
if self._nbins is None:
self._nbins = self._get_nbins()
return self._nbins
@property
def rng(self):
"""Return and cache range xr.DataArray."""
if self._rng is None:
self._rng = self._get_range()
return self._rng
@property
def ray_times(self):
"""Return and cache ray_times xr.DataArray."""
if self._rtime is None:
da = self._get_ray_times()
# decode, if necessary
if self.decode_times:
da = self._decode_cf(da)
self._rtime = da
return self._rtime
@property
def time(self):
"""Return and cache time xr.DataArray."""
if self._time is None:
da = self._get_time()
# decode, if necessary
if self.decode_times:
da = self._decode_cf(da)
self._time = da
return self._time
@property
def starttime(self):
"""Return sweep starttime xr.DataArray."""
return self._time
@property
def endtime(self):
"""Return sweep endtime xr.DataArray."""
if self._endtime is None:
da = self._get_time(point="end")
# decode, if necessary
if self.decode_times:
da = self._decode_cf(da)
self._endtime = da
return self._endtime
[docs]class XRadMoment(OdimH5GroupAttributeMixin):
"""Class for holding one radar moment
Parameters
----------
ncfile : {netCDF4.Dataset, h5py.File or h5netcdf.File object}
File handle of file containing radar sweep
ncpath : str
path to moment group (datasetX)
parent : XRadSweep
parent sweep object
"""
[docs] def __init__(self, ncfile, ncpath, parent):
super().__init__(ncfile, ncpath, parent)
self._quantity = None
def __repr__(self):
summary = [f"<wradlib.{type(self).__name__}>"]
dims = "Dimension(s):"
dims_summary = [f"{self.parent._dim0[0]}: {self.parent.nrays}"]
dims_summary.append(f"{self.parent._dim1}: {self.parent.nbins}")
dims_summary = ", ".join(dims_summary)
summary.append(f"{dims} ({dims_summary})")
angle = "Elevation(s):"
angle_summary = f"{self.parent.fixed_angle:.1f}"
summary.append(f"{angle} ({angle_summary})")
moms = "Moment:"
moms_summary = f"{self.quantity}"
summary.append(f"{moms} ({moms_summary})")
return "\n".join(summary)
@property
def data(self):
"""Return moment xr.DataArray."""
return self.parent.data[self.quantity]
@property
def time(self):
"""Return sweep time."""
return self.parent.time
@property
def quantity(self):
"""Return `quantity` aka moment name"""
if self._quantity is None:
if isinstance(self.parent, XRadSweepOdim):
self._quantity = self.what["quantity"]
else:
self._quantity = gamic_mapping[self.attrs["moment"].lower()]
return self._quantity
[docs]class XRadSweep(OdimH5GroupAttributeMixin, OdimH5SweepMetaDataMixin, XRadBase):
"""Class for holding one radar sweep
Parameters
----------
ncfile : {netCDF4.Dataset, h5py.File or h5netcdf.File object}
File handle of file containing radar sweep
ncpath : str
path to sweep group
"""
[docs] def __init__(self, ncfile, ncpath, parent=None, **kwargs):
super().__init__(ncfile, ncpath, parent)
self._dask_kwargs = {
"chunks": kwargs.get("chunks", None),
"parallel": kwargs.get("parallel", False),
}
self._cf_kwargs = {
"mask_and_scale": kwargs.get("mask_and_scale", True),
"decode_coords": kwargs.get("decode_coords", True),
"decode_times": kwargs.get("decode_times", True),
}
self._misc_kwargs = {
"keep_elevation": kwargs.get("keep_elevation", False),
"keep_azimuth": kwargs.get("keep_azimuth", False),
}
self._data = None
self._need_time_recalc = False
self._seq.extend(self._get_moments())
self._dim0 = ("azimuth", "elevation")
self._dim1 = "range"
self.fixed_angle
def __repr__(self):
summary = [f"<wradlib.{type(self).__name__}>"]
dims = "Dimension(s):"
dims_summary = [f"{self._dim0[0]}: {self.nrays}"]
dims_summary.append(f"{self._dim1}: {self.nbins}")
dims_summary = ", ".join(dims_summary)
summary.append(f"{dims} ({dims_summary})")
angle = f"{self._dim0[1].capitalize()}(s):"
angle_summary = f"{self.fixed_angle:0.1f}"
summary.append(f"{angle} ({angle_summary})")
moms = "Moment(s):"
moms_summary = self.moments
moms_summary = ", ".join(moms_summary)
summary.append(f"{moms} ({moms_summary})")
return "\n".join(summary)
def __del__(self):
if self._data is not None:
self._data.close()
self._data = None
self._ncfile = None
def _decode_cf(self, obj):
if isinstance(obj, xr.DataArray):
out = xr.decode_cf(xr.Dataset({"arr": obj}), **self._cf_kwargs).arr
else:
out = xr.decode_cf(obj, **self._cf_kwargs)
return out
def _get_coords(self):
ds = xr.Dataset(
coords={
"time": self.time,
"rtime": self.ray_times,
"azimuth": self.azimuth,
"elevation": self.elevation,
"range": self.rng,
}
)
return ds
def _get_moments(self):
mdesc = self._mdesc
moments = [k for k in self.groups if mdesc in k]
moments_idx = np.argsort([int(s[len(mdesc) :]) for s in moments])
moments_names = np.array(moments)[moments_idx].tolist()
moments = [
XRadMoment(
ncfile=self.ncfile, ncpath="/".join([self.ncpath, mom]), parent=self
)
for mom in moments_names
]
return moments
def reset_data(self):
"""Reset .data xr.Dataset"""
self._data = None
@property
def _mdesc(self):
return self._get_mdesc()
@property
def chunks(self):
"""Return `chunks` setting."""
return self._dask_kwargs.get("chunks")
@property
def parallel(self):
"""Return `parallel` setting."""
return self._dask_kwargs.get("parallel")
@property
def mask_and_scale(self):
"""Return `mask_and_scale` setting."""
return self._cf_kwargs.get("mask_and_scale")
@property
def decode_coords(self):
"""Return `decode_coords` setting."""
return self._cf_kwargs.get("decode_coords")
@property
def decode_times(self):
"""Return `decode_times` setting."""
return self._cf_kwargs.get("decode_times")
@property
def data(self):
"""Return and cache moments as combined xr.Dataset"""
if self._data is None:
self._data = self._merge_moments()
# if self._data is not None:
# if metadata declared in XRadTimeseries, load and assign
if self.parent._meta is not None:
vars = {}
for k, v in self.parent._meta.items():
attr = self._get_attribute(v, attr=k)
if hasattr(attr, "ndim"):
attr = xr.DataArray(attr, dims=[self._dim0[0]])
vars[k] = attr
self._data = self._data.assign(vars)
if self.decode_coords:
coords = self._get_coords().coords
self._data = self._data.assign_coords(coords)
self._data = self._data.sortby(self._dim0[0])
self._data = self._data.pipe(_reindex_angle, self)
sweep_mode = (
"azimuth_surveillance" if self._dim0[0] == "azimuth" else "rhi"
)
self._data = self._data.assign_coords({"sweep_mode": sweep_mode})
self._data = self._data.assign_coords(self.parent.parent.site.coords)
if self.mask_and_scale | self.decode_coords | self.decode_times:
self._data = self._data.pipe(self._decode_cf)
return self._data
@property
def coords(self):
"""Returns xr.Dataset containing coordinates."""
# sort coords by azimuth, only necessary for gamic flavour
# for odim is already sorted
return self._get_coords().sortby(self._dim0[0])
@property
def moments(self):
"""Return list of moments."""
return [f"{k.quantity}" for k in self]
class XRadSweepOdim(XRadSweep):
"""Class for holding one radar sweep
Parameters
----------
ncfile : {netCDF4.Dataset, h5py.File or h5netcdf.File object}
File handle of file containing radar sweep
ncpath : str
path to sweep group
"""
def __init__(self, ncfile, ncpath, parent=None, **kwargs):
super().__init__(ncfile, ncpath, parent, **kwargs)
def _get_a1gate(self):
return self.where["a1gate"]
def _get_angle_resolution(self):
return self.azimuth.diff(self._dim0[0]).median(skipna=True).round(decimals=1)
def _get_fixed_angle(self):
# try RHI first
angle_keys = ["az_angle", "azangle"]
for ak in angle_keys:
angle = self.where.get(ak, None)
if angle is not None:
break
if angle is not None:
angle = np.round(angle, decimals=1)
self._dim0 = (self._dim0[1], self._dim0[0])
else:
angle = np.round(self.where["elangle"], decimals=1)
return angle
def _get_azimuth_how(self):
how = self.how
startaz = how["startazA"]
stopaz = how["stopazA"]
zero_index = np.where(stopaz < startaz)
stopaz[zero_index[0]] += 360
azimuth_data = (startaz + stopaz) / 2.0
return azimuth_data
def _get_azimuth_where(self):
nrays = self.where["nrays"]
res = 360.0 / nrays
azimuth_data = np.arange(res / 2.0, 360.0, res, dtype="float32")
return azimuth_data
def _get_elevation_how(self):
how = self.how
startel = how["startelA"]
stopel = how["stopelA"]
elevation_data = (startel + stopel) / 2.0
return elevation_data
def _get_elevation_where(self):
where = self.where
nrays = where["nrays"]
elangle = where["elangle"]
elevation_data = np.ones(nrays, dtype="float32") * elangle
return elevation_data
def _get_time_how(self):
how = self.how
startT = how["startazT"]
stopT = how["stopazT"]
time_data = (startT + stopT) / 2.0
return time_data
def _get_time_what(self, nrays=None):
what = self.what
startdate = what["startdate"]
starttime = what["starttime"]
enddate = what["enddate"]
endtime = what["endtime"]
start = dt.datetime.strptime(startdate + starttime, "%Y%m%d%H%M%S")
end = dt.datetime.strptime(enddate + endtime, "%Y%m%d%H%M%S")
start = start.replace(tzinfo=dt.timezone.utc).timestamp()
end = end.replace(tzinfo=dt.timezone.utc).timestamp()
if nrays is None:
nrays = self.where["nrays"]
if start == end:
warnings.warn(
"WRADLIB: Equal ODIM `starttime` and `endtime` "
"values. Can't determine correct sweep start-, "
"end- and raytimes.",
UserWarning,
)
time_data = np.ones(nrays) * start
else:
delta = (end - start) / nrays
time_data = np.arange(start + delta / 2.0, end, delta)
time_data = np.roll(time_data, shift=+self.a1gate)
return time_data
def _get_ray_times(self, nrays=None):
try:
time_data = self._get_time_how()
self._need_time_recalc = False
except (AttributeError, KeyError, TypeError):
time_data = self._get_time_what(nrays=nrays)
self._need_time_recalc = True
da = xr.DataArray(
time_data,
dims=[self._dim0[0]],
attrs=get_time_attrs("1970-01-01T00:00:00Z"),
)
return da
def _get_azimuth(self):
try:
azimuth_data = self._get_azimuth_how()
except (AttributeError, KeyError, TypeError):
azimuth_data = self._get_azimuth_where()
da = xr.DataArray(azimuth_data, dims=[self._dim0[0]], attrs=get_azimuth_attrs())
if not self._misc_kwargs["keep_azimuth"] and self._dim0[0] == "elevation":
da = da.pipe(_fix_angle)
return da
def _get_elevation(self):
try:
elevation_data = self._get_elevation_how()
except (AttributeError, KeyError, TypeError):
elevation_data = self._get_elevation_where()
da = xr.DataArray(
elevation_data, dims=[self._dim0[0]], attrs=get_elevation_attrs()
)
if not self._misc_kwargs["keep_elevation"] and self._dim0[0] == "azimuth":
da = da.pipe(_fix_angle)
return da
def _get_mdesc(self):
return "data"
def _get_range(self):
where = self.where
ngates = where["nbins"]
range_start = where["rstart"] * 1000.0
bin_range = where["rscale"]
cent_first = range_start + bin_range / 2.0
range_data = np.arange(
cent_first, range_start + bin_range * ngates, bin_range, dtype="float32"
)
range_attrs = get_range_attrs(range_data)
da = xr.DataArray(range_data, dims=[self._dim1], attrs=range_attrs)
return da
def _merge_moments(self):
ds = _open_mfmoments(
self,
chunks=self.chunks,
preprocess=_preprocess_moment,
parallel=self.parallel,
mask_and_scale=self.mask_and_scale,
decode_times=self.decode_times,
decode_coords=self.decode_coords,
)
return ds
def _get_nrays(self):
return self.where["nrays"]
def _get_nbins(self):
return self.where["nbins"]
def _get_time(self, point="start"):
what = self.what
startdate = what[f"{point}date"]
starttime = what[f"{point}time"]
start = dt.datetime.strptime(startdate + starttime, "%Y%m%d%H%M%S")
start = start.replace(tzinfo=dt.timezone.utc).timestamp()
da = xr.DataArray(start, attrs=get_time_attrs("1970-01-01T00:00:00Z"))
return da
def _get_time_fast(self):
ncid = self.ncid
try:
if self.is_netcdf:
startdate = ncid["what"].getncattr("startdate")
starttime = ncid["what"].getncattr("starttime")
else:
startdate = _maybe_decode(ncid["what"].attrs["startdate"])
starttime = _maybe_decode(ncid["what"].attrs["starttime"])
except (IndexError, KeyError):
return None
start = dt.datetime.strptime(startdate + starttime, "%Y%m%d%H%M%S")
start = start.replace(tzinfo=dt.timezone.utc).timestamp()
return start
class XRadSweepGamic(XRadSweep):
"""Class for holding one radar sweep
Parameters
----------
ncfile : {netCDF4.Dataset, h5py.File or h5netcdf.File object}
File handle of file containing radar sweep
ncpath : str
path to sweep group
"""
def __init__(self, ncfile, ncpath, parent=None, **kwargs):
super().__init__(ncfile, ncpath, parent, **kwargs)
self._ray_header = None
@property
def ray_header(self):
# todo: caching adds to memory footprint
if self._ray_header is None:
self._ray_header = self.ncid["ray_header"][:]
return self._ray_header
def _get_a1gate(self):
return np.argsort(self.coords.rtime.values)[0]
def _get_angle_resolution(self):
return self.how["angle_step"]
def _get_azimuth(self):
azstart = self.ray_header["azimuth_start"]
azstop = self.ray_header["azimuth_stop"]
if self._dim0[0] == "azimuth":
zero_index = np.where(azstop < azstart)
azstop[zero_index[0]] += 360
azimuth = (azstart + azstop) / 2.0
da = xr.DataArray(azimuth, dims=[self._dim0[0]], attrs=get_azimuth_attrs())
if not self._misc_kwargs["keep_azimuth"] and self._dim0[0] == "elevation":
da = da.pipe(_fix_angle)
return da
def _get_elevation(self):
elstart = self.ray_header["elevation_start"]
elstop = self.ray_header["elevation_stop"]
elevation = (elstart + elstop) / 2.0
da = xr.DataArray(elevation, dims=[self._dim0[0]], attrs=get_elevation_attrs())
if not self._misc_kwargs["keep_elevation"] and self._dim0[0] == "azimuth":
da = da.pipe(_fix_angle)
return da
def _get_mdesc(self):
return "moment_"
def _get_range(self):
range_samples = self.how["range_samples"]
range_step = self.how["range_step"]
bin_range = range_step * range_samples
range_data = np.arange(
bin_range / 2.0, bin_range * self.nbins, bin_range, dtype="float32"
)
range_attrs = get_range_attrs(range_data)
da = xr.DataArray(range_data, dims=[self._dim1], attrs=range_attrs)
return da
def _get_ray_times(self):
times = self.ray_header["timestamp"] / 1e6
attrs = {"units": "seconds since 1970-01-01T00:00:00Z", "standard_name": "time"}
da = xr.DataArray(times, dims=[self._dim0[0]], attrs=attrs)
return da
def _get_fixed_angle(self):
try:
angle = np.round(self.how[self._dim0[1]], decimals=1)
except KeyError:
self._dim0 = (self._dim0[1], self._dim0[0])
angle = np.round(self.how[self._dim0[1]], decimals=1)
return angle
def _merge_moments(self):
if "h5" in self.engine:
if Version(h5netcdf.__version__) < Version("0.8.0"):
warnings.warn(
f"WRADLIB: 'h5netcdf>=0.8.0' needed to perform this "
f"operation. 'h5netcdf={h5netcdf.__version__} "
f"available.",
UserWarning,
)
return None
if Version(xr.__version__) < Version("0.15.0"):
warnings.warn(
f"WRADLIB: 'xarray>=0.15.0' needed to perform this "
f"operation. 'xarray={xr.__version__} "
f"available.",
UserWarning,
)
return None
opener = h5netcdf.File
opener_kwargs = {"phony_dims": "access"}
store = xr.backends.H5NetCDFStore
else:
opener = netCDF4.Dataset
opener_kwargs = {}
store = xr.backends.NetCDF4DataStore
if os.path.isfile(self.filename):
ds0 = opener(self.filename, "r", **opener_kwargs)
else:
ds0 = self.ncfile
ds = xr.open_dataset(store(ds0, self.ncpath), chunks=self.chunks)
ds = ds.drop_vars("ray_header", errors="ignore")
for mom in self:
mom_name = mom.ncpath.split("/")[-1]
dmom = ds[mom_name]
name = dmom.moment.lower()
try:
name = gamic_mapping[name]
except KeyError:
ds = ds.drop_vars(mom_name)
continue
# extract and translate attributes to cf
attrs = collections.OrderedDict()
dmax = np.iinfo(dmom.dtype).max
dmin = np.iinfo(dmom.dtype).min
minval = dmom.dyn_range_min
maxval = dmom.dyn_range_max
dtype = minval.dtype
dyn_range = maxval - minval
if maxval != minval:
gain = dyn_range / (dmax - 1)
minval -= gain
else:
gain = (dmax - dmin) / dmax
minval = dmin
gain = np.array([gain])[0].astype(dtype)
minval = np.array([minval])[0].astype(dtype)
undetect = np.array([dmin])[0].astype(dtype)
attrs["scale_factor"] = gain
attrs["add_offset"] = minval
attrs["_FillValue"] = undetect
attrs["_Undetect"] = undetect
if self.decode_coords:
attrs["coordinates"] = "elevation azimuth range"
mapping = moments_mapping[name]
attrs.update({key: mapping[key] for key in moment_attrs})
# assign attributes to moment
dmom.attrs = collections.OrderedDict()
dmom.attrs.update(attrs)
ds = ds.rename({mom_name: name.upper()})
# fix dimensions
dims = sorted(list(ds.dims.keys()), key=lambda x: int(x[len("phony_dim_") :]))
ds = ds.rename({dims[0]: self._dim0[0], dims[1]: self._dim1})
# todo: this sorts and reindexes the unsorted GAMIC dataset by azimuth
# only if `decode_coords` is False
# adding coord -> sort -> reindex -> remove coord
if not self.decode_coords: # and (self._dim0[0] == "azimuth"):
ds = (
ds.assign_coords({self._dim0[0]: getattr(self, self._dim0[0])})
.sortby(self._dim0[0])
.pipe(_reindex_angle, self)
.drop_vars(self._dim0[0])
)
return ds
def _get_nrays(self):
return self.how["ray_count"]
def _get_nbins(self):
return self.how["bin_count"]
def _get_time(self):
start = self.how["timestamp"]
start = dateutil.parser.parse(start)
start = start.replace(tzinfo=dt.timezone.utc).timestamp()
da = xr.DataArray(start, attrs=get_time_attrs("1970-01-01T00:00:00Z"))
return da
def _get_time_fast(self):
ncid = self.ncid
try:
if self.is_netcdf:
start = ncid["how"].getncattr("timestamp")
else:
start = ncid["how"].attrs["timestamp"]
if Version(h5py.__version__) < Version("3.0.0"):
start = start.decode()
except (IndexError, KeyError):
return None
start = dateutil.parser.parse(start)
start = start.replace(tzinfo=dt.timezone.utc).timestamp()
return start
[docs]class XRadTimeSeries(OdimH5GroupAttributeMixin, XRadBase):
"""Class for holding a timeseries of radar sweeps"""
[docs] def __init__(self, **kwargs):
super().__init__()
self._data = None
self._moments = None
self._meta = None
# override append and claim file for OdimH5GroupAttributeMixin
def append(self, value):
# do only for first file in this timeseries
value._parent = self
if not len(self):
self._ncfile = value.ncfile
self._ncpath = value.ncpath
return super().append(value)
def __repr__(self):
summary = [f"<wradlib.{type(self).__name__}>"]
dims = "Dimension(s):"
dims_summary = [f"time: {len(self)}"]
dims_summary.append(f"{self._seq[0]._dim0[0]}: {self._seq[0].nrays}")
dims_summary.append(f"{self._seq[0]._dim1}: {self._seq[0].nbins}")
dims_summary = ", ".join(dims_summary)
summary.append(f"{dims} ({dims_summary})")
angle = f"{self._seq[0]._dim0[1].capitalize()}(s):"
angle_summary = self[0].fixed_angle
summary.append(f"{angle} ({angle_summary:.1f})")
return "\n".join(summary)
def reset_data(self):
self._data = None
@property
def data(self):
if self._data is None:
# moments handling
# coords = set(['rtime', 'range', 'azimuth', 'elevation', 'time',
# 'altitude', 'latitude', 'longitude', 'sweep_mode'])
# get intersection and union
moment_set = [set(t1.moments) for t1 in self]
moment_set_i = set.intersection(*moment_set)
moment_set_u = set.union(*moment_set)
# drop variables not available in all datasets
drop = moment_set_i ^ moment_set_u
# keep = (moment_set_i | coords) ^ coords
drop = list(self.check_moments().keys())
if drop:
warnings.warn(
f"wradlib: Moments {*drop,} are not available in all datasets "
"and will be dropped from the result.\n"
"This will be solved in xarray, see "
"https://github.com/pydata/xarray/pull/3545"
)
# todo: catch possible error and add precise ErrorMessage
self._data = xr.concat(
[
f.data.drop_vars(drop, errors="ignore")
for f in tqdm(
self, desc="Collecting", unit=" Timesteps", leave=None
)
],
# data_vars=list(keep),
dim="time",
)
return self._data
def check_rays(self):
nrays = [swp.nrays for swp in self]
snrays = set(nrays)
idx = []
for nr in snrays:
if nr % 360:
idx.extend(np.argwhere(np.array(nrays) == nr).flatten().tolist())
if len(snrays) > 1:
warnings.warn(
f"wradlib: number of rays differing between sweeps.\n" f"{snrays}"
)
return snrays, idx
def check_moments(self):
moments = [set([mom.quantity for mom in swp]) for swp in self]
mi = set.intersection(*moments)
mu = set.union(*moments)
mp = mi ^ mu
miss = {}
for mom in mu:
idx = []
if mom in mp:
for i, mset in enumerate(moments):
if mom not in mset:
idx.append(i)
miss[mom] = idx
return miss
def set_moments(self, moments):
if not isinstance(moments, list):
pass
else:
self._moments = moments
def set_metadata(self, metadata):
if not isinstance(metadata, dict):
pass
else:
self._meta = metadata
[docs]class XRadVolume(OdimH5GroupAttributeMixin, XRadBase):
"""Class for holding a volume of radar sweeps"""
[docs] def __init__(self, **kwargs):
super().__init__()
self._data = None
self._root = None
def __repr__(self):
summary = [f"<wradlib.{type(self).__name__}>"]
dims = "Dimension(s):"
dims_summary = f"sweep: {len(self)}"
summary.append(f"{dims} ({dims_summary})")
angle = f"{self[0][0]._dim0[1].capitalize()}(s):"
angle_summary = [f"{k[0].fixed_angle:.1f}" for k in self]
angle_summary = ", ".join(angle_summary)
summary.append(f"{angle} ({angle_summary})")
return "\n".join(summary)
@property
def root(self):
"""Return root object."""
if self._root is None:
self.assign_root()
return self._root
def assign_root(self):
"""(Re-)Create root object according CfRadial2 standard"""
# assign root variables
sweep_group_names = [f"sweep_{i}" for i in range(len(self))]
try:
sweep_fixed_angles = [ts[0].fixed_angle for ts in self]
except AttributeError:
sweep_fixed_angles = [ts.fixed_angle for ts in self]
# extract time coverage
times = np.array(
[[t[0].ray_times.values.min(), t[-1].ray_times.values.max()] for t in self]
).flatten()
time_coverage_start = min(times)
time_coverage_end = max(times)
time_coverage_start_str = str(time_coverage_start)[:19] + "Z"
time_coverage_end_str = str(time_coverage_end)[:19] + "Z"
# create root group from scratch
root = xr.Dataset() # data_vars=wrl.io.xarray.global_variables,
# attrs=wrl.io.xarray.global_attrs)
# take first dataset/file for retrieval of location
site = self.site
# assign root variables
root = root.assign(
{
"volume_number": 0,
"platform_type": "fixed",
"instrument_type": "radar",
"primary_axis": "axis_z",
"time_coverage_start": time_coverage_start_str,
"time_coverage_end": time_coverage_end_str,
"latitude": site["latitude"].values,
"longitude": site["longitude"].values,
"altitude": site["altitude"].values,
"sweep_group_name": (["sweep"], sweep_group_names),
"sweep_fixed_angle": (["sweep"], sweep_fixed_angles),
}
)
# assign root attributes
attrs = collections.OrderedDict()
attrs.update(
{
"version": "None",
"title": "None",
"institution": "None",
"references": "None",
"source": "None",
"history": "None",
"comment": "im/exported using wradlib",
"instrument_name": "None",
}
)
attrs["version"] = self.what["version"]
root = root.assign_attrs(attrs)
root = root.assign_attrs(self.attrs)
self._root = root
@property
def site(self):
"""Return coordinates of radar site."""
ds = xr.Dataset(coords=self.where).rename(
{"height": "altitude", "lon": "longitude", "lat": "latitude"}
)
return ds
@property
def Conventions(self):
"""Return Conventions string."""
try:
conv = self.ncid.attrs["Conventions"]
except KeyError:
conv = None
return conv
def to_odim(self, filename, timestep=0):
"""Save volume to ODIM_H5/V2_2 compliant file.
Parameters
----------
filename : str
Name of the output file
timestep : int
timestep of wanted volume
"""
if self.root:
to_odim(self, filename, timestep=timestep)
else:
warnings.warn(
"WRADLIB: No OdimH5-compliant data structure " "available. Not saving.",
UserWarning,
)
def to_cfradial2(self, filename, timestep=0):
"""Save volume to CfRadial2 compliant file.
Parameters
----------
filename : str
Name of the output file
timestep : int
timestep wanted volume
"""
if self.root:
to_cfradial2(self, filename, timestep=timestep)
else:
warnings.warn(
"WRADLIB: No CfRadial2-compliant data structure "
"available. Not saving.",
UserWarning,
)
def to_netcdf(self, filename, timestep=None, keys=None):
"""Save volume to netcdf compliant file.
Parameters
----------
filename : str
Name of the output file
timestep : int, slice
timestep/slice of wanted volume
keys : list
list of sweep_group_names which should be written to the file
"""
if self.root:
to_netcdf(self, filename, keys=keys, timestep=timestep)
else:
warnings.warn(
"WRADLIB: No netcdf-compliant data structure " "available. Not saving.",
UserWarning,
)
@deprecation.deprecated(
deprecated_in="1.10",
removed_in="2.0",
current_version=version.version,
details="Use xarray BackendEntrypoint based functionality instead.",
)
def collect_by_time(obj):
"""Collect XRadSweep objects having same time
Parameters
----------
obj : list
list of XRadSweep objects
Returns
-------
out : XRadTimeSeries
wrapper around list of XRadSweep objects
"""
out = XRadTimeSeries()
if isinstance(obj, XRadSweep):
obj = [obj]
times = [ds._get_time_fast() for ds in obj]
unique_times = np.array(sorted(list(set(times))))
if len(unique_times) == len(obj):
out.extend(obj)
out.sort(key=lambda x: x._get_time_fast())
else:
# runs only if several files for the same timestep are available
# eg DWD's one sweep one moment files
for t in unique_times:
idx = np.argwhere(times == t).flatten()
out1 = obj[idx[0]]
[out1.extend(obj[i]) for i in idx[1:]]
out.append(out1)
return out
@deprecation.deprecated(
deprecated_in="1.10",
removed_in="2.0",
current_version=version.version,
details="Use xarray BackendEntrypoint based functionality instead.",
)
def collect_by_angle(obj):
"""Collect XRadSweep objects having same angle
Parameters
----------
obj : list
list of XRadSweep objects
Returns
-------
out : :class:`wradlib.io.xarray.XRadVolume`
wrapper around nested list of XRadSweep objects
"""
out = XRadVolume()
angles = [ds.fixed_angle for ds in obj]
unique_angles = list(set(angles))
if len(unique_angles) == len(obj):
out.extend(obj)
else:
for a in unique_angles:
idx = np.argwhere(angles == a).flatten()
merge_list = [obj[i] for i in idx]
out.append(merge_list)
return out
def _open_odim_sweep(filename, loader, **kwargs):
"""Returns list of XRadSweep objects
Every sweep will be put into it's own class instance.
"""
ld_kwargs = kwargs.get("ld_kwargs", {})
if loader == "netcdf4":
opener = netCDF4.Dataset
attr = "groups"
elif loader == "h5netcdf":
opener = h5netcdf.File
attr = "keys"
ld_kwargs["phony_dims"] = "access"
else:
opener = h5py.File
attr = "keys"
dsdesc = "dataset"
sweep_cls = XRadSweepOdim
if "GAMIC" in kwargs.get("flavour", "ODIM"):
if loader == "netcdf4":
raise ValueError(
"wradlib: GAMIC files can't be read using netcdf4"
" loader. Use either 'h5py' or 'h5netcdf."
)
dsdesc = "scan"
sweep_cls = XRadSweepGamic
# open file
if not isinstance(filename, str):
if has_import(h5py) and opener == h5py.File:
raise ValueError(
"wradlib: file-like objects can't be read using h5py "
"loader. Use either 'netcdf4' or 'h5netcdf'."
)
if has_import(netCDF4) and opener == netCDF4.Dataset:
handle = opener(
f"{str(filename)}", mode="r", memory=filename.read(), **ld_kwargs
)
else:
handle = opener(filename, "r", **ld_kwargs)
else:
handle = opener(filename, "r", **ld_kwargs)
# get group names
fattr = getattr(handle, attr)
if callable(fattr):
groups = list(fattr())
else:
groups = list(fattr)
# iterate over single sweeps
# todo: if sorting does not matter, we can skip this
sweeps = [k for k in groups if dsdesc in k]
sweeps_idx = np.argsort([int(s[len(dsdesc) :]) for s in sweeps])
sweeps = np.array(sweeps)[sweeps_idx].tolist()
return [sweep_cls(handle, k, **kwargs) for k in sweeps]
[docs]@deprecation.deprecated(
deprecated_in="1.10",
removed_in="2.0",
current_version=version.version,
details="Use the appropriate `wradlib.io.open_{engine}_dataset` or `wradlib.io.open_{engine}_mfdataset` function.",
)
def open_odim(paths, loader="netcdf4", **kwargs):
"""Open multiple ODIM files as a XRadVolume structure.
Parameters
----------
paths : str or sequence
Either a filename or string glob in the form `'path/to/my/files/*.h5'`
or an explicit list of files to open.
loader : {'netcdf4', 'h5py', 'h5netcdf'}
Loader used for accessing file metadata, defaults to 'netcdf4'.
kwargs : dict, optional
Additional arguments passed on to :class:`wradlib.io.xarray.XRadSweep`.
"""
if (loader == "h5netcdf") and (
has_import(h5netcdf) and (Version(h5netcdf.__version__) < Version("0.8.0"))
):
warnings.warn(
f"WRADLIB: 'h5netcdf>=0.8.0' needed to perform this "
f"operation. 'h5netcdf={h5netcdf.__version__} "
f"available.",
UserWarning,
)
if isinstance(paths, str):
paths = glob.glob(paths)
else:
paths = np.array(paths).flatten().tolist()
if loader not in ["netcdf4", "h5netcdf", "h5py"]:
raise ValueError(f"wradlib: Unknown loader: {loader}")
sweeps = []
[
sweeps.extend(_open_odim_sweep(f, loader, **kwargs))
for f in tqdm(paths, desc="Open", unit=" Files", leave=None)
]
angles = collect_by_angle(sweeps)
for i in tqdm(range(len(angles)), desc="Collecting", unit=" Angles", leave=None):
angles[i] = collect_by_time(angles[i])
angles.sort(key=lambda x: x[0].time)
for f in angles:
f._parent = angles
angles._ncfile = angles[0].ncfile
angles._ncpath = "/"
return angles
class XRadVolFile:
"""BaseClass for holding netCDF4.Dataset handles"""
def __init__(self, filename=None, flavour=None, **kwargs):
self._filename = filename
self._nch = None
self._flavour = None
self._nch, self._flavour = self._check_file(filename, flavour)
def _check_file(self, filename, flavour):
raise NotImplementedError
def __del__(self):
if self._nch is not None:
self._nch.close()
@property
def filename(self):
return self._filename
@property
def nch(self):
return self._nch
@property
def flavour(self):
return self._flavour
class OdimH5File(XRadVolFile):
"""Class for holding netCDF4.Dataset handles of OdimH5 files
Parameters
----------
filename : str
Source data file name.
flavour : str
Name of hdf5 flavour ('ODIM' or 'GAMIC'). Defaults to 'ODIM'.
"""
def __init__(self, filename=None, flavour=None, **kwargs):
super().__init__(filename=filename, flavour=flavour, **kwargs)
def _check_file(self, filename, flavour):
nch = netCDF4.Dataset(filename, diskless=True, persist=False)
if nch.disk_format != "HDF5":
raise TypeError(
f"wradlib: File {filename} is neither 'NETCDF4' (using HDF5 groups) "
"nor plain 'HDF5'."
)
if flavour is None:
try:
flavour = nch.Conventions
except AttributeError as e:
raise AttributeError(
f"wradlib: Missing 'Conventions' attribute in {filename} ./n"
"Use the 'flavour' kwarg to specify your source data."
) from e
if "ODIM" not in flavour:
raise AttributeError(
f"wradlib: 'Conventions' attribute '{flavour}' in {filename} is unknown./n"
"Use the 'flavour' kwarg to specify your source data."
)
if "ODIM" in flavour:
self._dsdesc = "dataset"
self._swmode = "product"
self._mfmt = "data"
self._msrc = "groups"
elif "GAMIC" in flavour:
self._dsdesc = "scan"
self._swmode = "scan_type"
self._mfmt = "moment_"
self._msrc = "variables"
else:
raise AttributeError(
f"wradlib: Unknown 'flavour' kwarg attribute: {flavour} ."
)
return nch, flavour
@property
def flavour(self):
flv = ["ODIM", "GAMIC"]
return [s for s in flv if s in self._flavour][0]
class NetCDF4File(XRadVolFile):
"""Class for holding netCDF4.Dataset handles of Cf/Radial files
Parameters
----------
filename : str
Source data file name.
flavour : str
Name of flavour ('Cf/Radial' or 'Cf/Radial2').
"""
def __init__(self, filename=None, flavour=None, **kwargs):
super().__init__(filename=filename, flavour=flavour, **kwargs)
def _check_file(self, filename, flavour):
nch = netCDF4.Dataset(filename, diskless=True, persist=False)
if flavour is None:
try:
Conventions = nch.Conventions
version = nch.version
except AttributeError as e:
raise AttributeError(
f"wradlib: Missing 'Conventions' attribute in {filename} ./n"
"Use the 'flavour' kwarg to specify your source data."
) from e
if "cf/radial" in Conventions.lower():
if version == "2.0":
flavour = "Cf/Radial2"
else:
flavour = "Cf/Radial"
if flavour not in ["Cf/Radial", "Cf/Radial2"]:
raise AttributeError(
f"wradlib: Unknown 'flavour' kwarg attribute: {flavour} ."
)
return nch, flavour
[docs]class XRadVol(collections.abc.MutableMapping):
"""BaseClass for xarray based RadarVolumes
Implements `collections.MutableMapping` dictionary.
"""
[docs] def __init__(self, init_root=False):
self._sweeps = {}
self._nch = []
self.root = None
self._sweep_angles = []
self._sweep_names = []
if init_root:
self._init_root()
warnings.warn(
f"{self.__class__} is deprecated as of 1.10. and will be removed in 2.0. "
f"Use xarray backends from `xradar`-package.",
category=FutureWarning,
stacklevel=3,
)
def __getitem__(self, key):
if key == "root":
warnings.warn(
"WRADLIB: Use of `obj['root']` is deprecated, "
"please use obj.root instead.",
DeprecationWarning,
)
return self._root
return self._sweeps[key]
def __setitem__(self, key, value):
if key in self._sweeps:
self._sweeps[key] = value
else:
warnings.warn(
"WRADLIB: Use class methods to add data. "
"Direct setting is not allowed.",
UserWarning,
)
def __delitem__(self, key):
del self._sweeps[key]
def __iter__(self):
return iter(self._sweeps)
def __len__(self):
return len(self._sweeps)
def __repr__(self):
return self._sweeps.__repr__()
def __del__(self):
del self._root
for k in list(self._sweeps):
del k
for k in self._nch:
del k
def _init_root(self):
self.root = xr.Dataset(data_vars=global_variables, attrs=global_attrs)
@property
def root(self):
"""Return `root` dataset."""
return self._root
@root.setter
def root(self, value):
self._root = value
@property
def sweep_angles(self):
if self.root is None:
return self._sweep_angles
return list(self.root.sweep_fixed_angle.values)
@sweep_angles.setter
def sweep_angles(self, value):
if self.root is None:
self._sweep_angles.append(value)
@property
def sweep_names(self):
if self.root is None:
return self._sweep_names
else:
return list(self.root.sweep_group_name.values)
@sweep_names.setter
def sweep_names(self, value):
if self.root is None:
self._sweep_names.append(value)
@property
def sweep(self):
"""Return sweep dimension count."""
return self.root.dims["sweep"]
@property
def sweeps(self):
"""Return zip sweep names, sweep_angles"""
return zip(self.sweep_names, self.sweep_angles)
@property
def location(self):
"""Return location of data source."""
return (
self.root.longitude.values.item(),
self.root.latitude.values.item(),
self.root.altitude.values.item(),
)
@property
def Conventions(self):
"""Return CF/ODIM `Conventions`."""
return self.root.Conventions
@property
def version(self):
"""Return CF/ODIM version"""
return self.root.version
def to_cfradial2(self, filename):
"""Save volume to CfRadial2.0 compliant file.
Parameters
----------
filename : str
Name of the output file
"""
if self.root:
to_cfradial2(self, filename)
else:
warnings.warn(
"WRADLIB: No CfRadial2-compliant data structure "
"available. Not saving.",
UserWarning,
)
def to_odim(self, filename):
"""Save volume to ODIM_H5/V2_2 compliant file.
Parameters
----------
filename : str
Name of the output file
"""
if self.root:
to_odim(self, filename)
else:
warnings.warn(
"WRADLIB: No OdimH5-compliant data structure " "available. Not saving.",
UserWarning,
)
def georeference(self, sweeps=None):
"""Georeference sweeps
Parameters
----------
sweeps : list
list with sweep keys to georeference, defaults to all sweeps
"""
if sweeps is None:
sweeps = self
for swp in sweeps:
self[swp] = self[swp].pipe(geo_xarray.georeference_dataset)
[docs]class CfRadial(XRadVol):
"""Class for xarray based retrieval of CfRadial data files"""
[docs] def __init__(self, filename=None, flavour=None, **kwargs):
"""Initialize xarray structure from Cf/Radial data structure.
Parameters
----------
filename : str
Source data file name.
flavour : str
Name of flavour ('Cf/Radial' or 'Cf/Radial2').
Keyword Arguments
-----------------
decode_times : bool
If True, decode cf times to np.datetime64. Defaults to True.
decode_coords : bool
If True, use the ‘coordinates’ attribute on variable
to assign coordinates. Defaults to True.
mask_and_scale : bool
If True, lazily scale (using scale_factor and add_offset)
and mask (using _FillValue). Defaults to True.
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
arrays. chunks={} loads the dataset with dask using a single
chunk for all arrays.
georef : bool
If True, adds 2D AEQD x,y,z-coordinates, ground_range (gr) and
2D (rays,bins)-coordinates for easy georeferencing (eg. cartopy)
dim0 : str
name of the ray-dimension of DataArrays and Dataset:
* `time` - cfradial2 standard
* `azimuth` - better for working with xarray
"""
super().__init__()
if not isinstance(filename, list):
filename = [filename]
for i, f in enumerate(filename):
nch = NetCDF4File(f, flavour=flavour)
self._nch.append(nch)
if nch.flavour == "Cf/Radial2":
self.assign_data_radial2(nch, **kwargs)
else:
self.assign_data_radial(nch, **kwargs)
def assign_data_radial2(self, nch, **kwargs):
"""Assign from CfRadial2 data structure.
Parameters
----------
nch : object
NetCDF4 File object
Keyword Arguments
-----------------
decode_times : bool
If True, decode cf times to np.datetime64. Defaults to True.
decode_coords : bool
If True, use the ‘coordinates’ attribute on variable
to assign coordinates. Defaults to True.
mask_and_scale : bool
If True, lazily scale (using scale_factor and add_offset)
and mask (using _FillValue). Defaults to True.
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
arrays. chunks={} loads the dataset with dask using a single
chunk for all arrays.
georef : bool
If True, adds 2D AEQD x,y,z-coordinates, ground_range (gr) and
2D (rays,bins)-coordinates for easy georeferencing (eg. cartopy)
dim0 : str
name of the ray-dimension of DataArrays and Dataset:
* `time` - cfradial2 standard
* `azimuth` - better for working with xarray
"""
# keyword argument handling
georef = kwargs.pop("georef", False)
dim0 = kwargs.pop("dim0", "time")
self.root = _open_dataset(nch.nch, grp=None, **kwargs)
sweepnames = self.root.sweep_group_name.values
for sw in sweepnames:
ds = _open_dataset(nch.nch, grp=sw, **kwargs)
ds = ds.swap_dims({"time": dim0})
coords = {
"longitude": self.root.longitude,
"latitude": self.root.latitude,
"altitude": self.root.altitude,
"azimuth": ds.azimuth,
"elevation": ds.elevation,
}
ds = ds.assign_coords(**coords)
# adding xyz aeqd-coordinates
if georef:
ds = geo_xarray.georeference_dataset(ds)
self._sweeps[sw] = ds
def assign_data_radial(self, nch, **kwargs):
"""Assign from CfRadial1 data structure.
Keyword Arguments
-----------------
decode_times : bool
If True, decode cf times to np.datetime64. Defaults to True.
decode_coords : bool
If True, use the ‘coordinates’ attribute on variable
to assign coordinates. Defaults to True.
mask_and_scale : bool
If True, lazily scale (using scale_factor and add_offset)
and mask (using _FillValue). Defaults to True.
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
arrays. chunks={} loads the dataset with dask using a single
chunk for all arrays.
georef : bool
If True, adds 2D AEQD x,y,z-coordinates, ground_range (gr) and
2D (rays,bins)-coordinates for easy georeferencing (eg. cartopy)
dim0 : str
name of the ray-dimension of DataArrays and Dataset:
* `time` - cfradial2 standard
* `azimuth` - better for working with xarray
"""
# keyword argument handling
georef = kwargs.pop("georef", False)
dim0 = kwargs.pop("dim0", "time")
root = _open_dataset(nch.nch, grp=None, **kwargs)
var = root.variables.keys()
remove_root = var ^ root_vars
remove_root &= var
root1 = root.drop_vars(remove_root).rename({"fixed_angle": "sweep_fixed_angle"})
sweep_group_name = []
for i in range(root1.dims["sweep"]):
sweep_group_name.append(f"sweep_{i + 1}")
self.root = root1.assign({"sweep_group_name": (["sweep"], sweep_group_name)})
keep_vars = sweep_vars1 | sweep_vars2 | sweep_vars3
remove_vars = var ^ keep_vars
remove_vars &= var
data = root.drop_vars(remove_vars)
data.attrs = {}
start_idx = data.sweep_start_ray_index.values
end_idx = data.sweep_end_ray_index.values
data = data.drop_vars({"sweep_start_ray_index", "sweep_end_ray_index"})
for i, sw in enumerate(sweep_group_name):
tslice = slice(start_idx[i], end_idx[i])
ds = data.isel(time=tslice, sweep=slice(i, i + 1)).squeeze("sweep")
ds = ds.swap_dims({"time": dim0})
ds.sweep_mode.load()
coords = {
"longitude": self.root.longitude,
"latitude": self.root.latitude,
"altitude": self.root.altitude,
"azimuth": ds.azimuth,
"elevation": ds.elevation,
"sweep_mode": ds.sweep_mode.item().decode(),
}
ds = ds.assign_coords(**coords)
# adding xyz aeqd-coordinates
if georef:
ds = geo_xarray.georeference_dataset(ds)
self._sweeps[sw] = ds
[docs]class OdimH5(XRadVol):
"""Class for xarray based retrieval of ODIM_H5 data files"""
[docs] def __init__(self, filename=None, flavour=None, **kwargs):
"""Initialize xarray structure from hdf5 data structure.
Parameters
----------
filename : str
Source data file name.
flavour : str
Name of hdf5 flavour ('ODIM' or 'GAMIC'). Defaults to 'ODIM'.
Keyword Arguments
-----------------
decode_times : bool
If True, decode cf times to np.datetime64. Defaults to True.
decode_coords : bool
If True, use the ‘coordinates’ attribute on variable
to assign coordinates. Defaults to True.
mask_and_scale : bool
If True, lazily scale (using scale_factor and add_offset)
and mask (using _FillValue). Defaults to True.
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
arrays. chunks={} loads the dataset with dask using a single
chunk for all arrays.
georef : bool
If True, adds 2D AEQD x,y,z-coordinates, ground_range (gr) and
2D (rays,bins)-coordinates for easy georeferencing (eg. cartopy)
standard : str
* `none` - data is read as verbatim as possible, no metadata
* `odim` - data is read, odim metadata added to datasets
* `cf-mandatory` - data is read according to cfradial2 standard
importing mandatory metadata
* `cf-full` - data is read according to cfradial2 standard
importing all available cfradial2 metadata (not fully
implemented)
dim0 : str
name of the ray-dimension of DataArrays and Dataset:
* `time` - cfradial2 standard
* `azimuth` - better for working with xarray
"""
super().__init__()
if not isinstance(filename, list):
filename = [filename]
if len(filename) == 0:
raise ValueError("File list empty")
for f in filename:
self.assign_data(f, flavour=flavour, **kwargs)
if "cf" in kwargs.get("standard", "cf-mandatory"):
self.assign_root()
def assign_data(self, filename, flavour=None, **kwargs):
"""Assign xarray dataset from hdf5 data structure.
Parameters
----------
filename : str
Source data file name.
flavour : str
Name of hdf5 flavour ('ODIM' or 'GAMIC'). Defaults to 'ODIM'.
Keyword Arguments
-----------------
decode_times : bool
If True, decode cf times to np.datetime64. Defaults to True.
decode_coords : bool
If True, use the ‘coordinates’ attribute on variable
to assign coordinates. Defaults to True.
mask_and_scale : bool
If True, lazily scale (using scale_factor and add_offset)
and mask (using _FillValue). Defaults to True.
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
arrays. chunks={} loads the dataset with dask using a single
chunk for all arrays.
georef : bool
If True, adds 2D AEQD x,y,z-coordinates, ground_range (gr) and
2D (rays,bins)-coordinates for easy georeferencing (eg. cartopy).
Defaults to False.
standard : str
* `none` - data is read as verbatim as possible, no metadata
* `odim` - data is read, odim metadata added to datasets
* `cf-mandatory` - data is read according to cfradial2 standard
importing mandatory metadata, default value
* `cf-full` - data is read according to cfradial2 standard
importing all available cfradial2 metadata (not fully
implemented)
dim0 : str
name of the ray-dimension of DataArrays and Dataset:
* `time` - cfradial2 standard, default value
* `azimuth` - better for working with xarray
"""
nch = OdimH5File(filename, flavour=flavour)
self._nch.append(nch)
# keyword argument handling
decode_times = kwargs.get("decode_times", True)
decode_coords = kwargs.get("decode_coords", True)
mask_and_scale = kwargs.get("mask_and_scale", True)
georef = kwargs.get("georef", False)
standard = kwargs.get("standard", "cf-mandatory")
dim0 = kwargs.get("dim0", "time")
# retrieve and assign global groups /how, /what, /where
groups = ["how", "what", "where"]
how, what, where = _get_odim_groups(nch.nch, groups)
rt_grps = {"how": how, "what": what, "where": where}
# sweep group handling
(src_swp_grp_name, swp_grp_name) = _get_odim_sweep_group_names(
nch.nch, nch._dsdesc
)
# iterate sweeps in file
for i, sweep in enumerate(src_swp_grp_name):
# retrieve ds and assign datasetX how/what/where group attributes
groups = [None, "how", "what", "where"]
ds, ds_how, ds_what, ds_where = _get_odim_groups(nch.nch[sweep], groups)
ds_grps = {"how": ds_how, "what": ds_what, "where": ds_where}
# moments
ds = _assign_odim_moments(ds, nch, sweep, **kwargs)
# retrieve and assign gamic ray_header
if nch.flavour == "GAMIC":
rh = _get_gamic_ray_header(nch.filename, i)
ds_grps["what"] = ds_grps["what"].assign(rh)
# coordinates wrap-up
vars = collections.OrderedDict()
coords = collections.OrderedDict()
if "cf" in standard or georef:
coords["longitude"] = rt_grps["where"].attrs["lon"]
coords["latitude"] = rt_grps["where"].attrs["lat"]
coords["altitude"] = rt_grps["where"].attrs["height"]
if "cf" in standard or georef:
sweep_mode = _get_odim_sweep_mode(nch, ds_grps)
coords["sweep_mode"] = sweep_mode
if "cf" in standard or decode_coords or georef:
coords.update(_get_odim_coordinates(nch, ds_grps))
# georeference needs coordinate variables
if georef:
geods = xr.Dataset(vars, coords)
geods = geo_xarray.georeference_dataset(geods)
coords.update(geods.coords)
# time coordinate
if "cf" in standard or decode_times:
timevals = _get_odim_timevalues(nch, ds_grps)
if decode_times:
coords["time"] = (
["dim_0"],
timevals,
get_time_attrs("1970-01-01T00:00:00Z"),
)
else:
coords["time"] = (["dim_0"], timevals)
# assign global sweep attributes
fixed_angle = _get_odim_fixed_angle(nch, ds_grps)
if "cf" in standard:
vars.update(
{
"sweep_number": i,
"sweep_mode": sweep_mode,
"follow_mode": "none",
"prt_mode": "fixed",
"fixed_angle": fixed_angle,
}
)
if "cf-full" in standard:
full_vars = _get_odim_full_vars(nch, ds_grps)
vars.update(full_vars)
# assign variables and coordinates
ds = ds.assign(vars)
ds = ds.assign_coords(**coords)
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", ".*does not create an index anymore.*"
)
ds = ds.rename({"dim_0": dim0, "dim_1": "range"})
# decode dataset if requested
if decode_times or decode_coords or mask_and_scale:
ds = xr.decode_cf(
ds,
decode_times=decode_times,
decode_coords=decode_coords,
mask_and_scale=mask_and_scale,
)
# determine if same sweep
try:
index = self.sweep_angles.index(fixed_angle)
except ValueError:
nidx = len(self._sweeps) + 1
swp_grp_name = f"sweep_{nidx}"
self._sweeps[swp_grp_name] = ds
self.sweep_names.append(swp_grp_name)
self.sweep_angles.append(fixed_angle)
else:
dictkey = self.sweep_names[index]
self._sweeps[dictkey] = xr.merge([self._sweeps[dictkey], ds])
def assign_root(self):
# retrieve and assign global groups /how, /what, /where
first = self._nch[0]
groups = ["how", "what", "where"]
how, what, where = _get_odim_groups(first.nch, groups)
rt_grps = {"how": how, "what": what, "where": where}
# assign root variables
# extract time coverage
tmin = [ds.time.values.min() for ds in self._sweeps.values()]
time_coverage_start = min(tmin)
tmax = [ds.time.values.max() for ds in self._sweeps.values()]
time_coverage_end = max(tmax)
time_coverage_start_str = str(time_coverage_start)[:19] + "Z"
time_coverage_end_str = str(time_coverage_end)[:19] + "Z"
# create root group from scratch
root = xr.Dataset(data_vars=global_variables, attrs=global_attrs)
# assign root variables
root = root.assign(
{
"volume_number": 0,
"platform_type": "fixed",
"instrument_type": "radar",
"primary_axis": "axis_z",
"time_coverage_start": time_coverage_start_str,
"time_coverage_end": time_coverage_end_str,
"latitude": rt_grps["where"].attrs["lat"],
"longitude": rt_grps["where"].attrs["lon"],
"altitude": rt_grps["where"].attrs["height"],
"sweep_group_name": (["sweep"], self.sweep_names),
"sweep_fixed_angle": (["sweep"], self.sweep_angles),
}
)
# assign root attributes
attrs = _get_odim_root_attributes(first, rt_grps)
root = root.assign_attrs(attrs)
self.root = root
def _open_dataset(nch, grp=None, **kwargs):
"""Open netcdf4/hdf5 group as xarray dataset.
Parameters
----------
nch : handle
netcdf4-file handle
grp : str
group to access
Returns
-------
nch : handle
xarray Dataset handle
"""
if grp is not None:
nch = nch.groups.get(grp, False)
if nch:
nch = xr.open_dataset(xr.backends.NetCDF4DataStore(nch), **kwargs)
return nch
def _get_gamic_ray_header(filename, scan):
"""Returns GAMIC ray header dictionary.
Parameters
----------
filename : str
filename of GAMIC file
scan : int
Number of scan in file
Returns
-------
vars : dict
OrderedDict of ray header items
"""
# ToDo: move rayheader into own dataset
h5 = h5py.File(filename, mode="r")
ray_header = h5[f"scan{scan}/ray_header"][:]
h5.close()
vars = collections.OrderedDict()
for name in ray_header.dtype.names:
rh = ray_header[name]
attrs = None
vars.update({name: (["dim_0"], rh, attrs)})
return vars
def _get_odim_sweep_group_names(nch, name):
"""Return sweep names.
Returns source names and cfradial names.
Parameters
----------
nch : handle
netCDF4 Dataset handle
name : str
Common part of source dataset names.
Returns
-------
src : list
list of source dataset names
swg_grp_name : list
list of corresponding cfradial sweep_group_name
"""
src = [key for key in nch.groups.keys() if name in key]
src.sort(key=lambda x: int(x[len(name) :]))
swp_grp_name = [f"sweep_{i}" for i in range(1, len(src) + 1)]
return src, swp_grp_name
def _get_odim_variables_moments(ds, moments=None, **kwargs):
"""Retrieve radar moments from dataset variables.
Parameters
----------
ds : xarray dataset
source dataset
moments : list
list of moment strings
Returns
-------
ds : xarray Dataset
altered dataset
"""
standard = kwargs.get("standard", "cf-mandatory")
mask_and_scale = kwargs.get("mask_and_scale", True)
decode_coords = kwargs.get("decode_coords", True)
# fix dimensions
dims = sorted(list(ds.dims.keys()), key=lambda x: int(x[len("phony_dim_") :]))
ds = ds.rename({dims[0]: "dim_0", dims[1]: "dim_1"})
for mom in moments:
# open dataX dataset
dmom = ds[mom]
name = dmom.moment.lower()
if "cf" in standard and name not in gamic_mapping.keys():
ds = ds.drop_vars(mom)
continue
# extract attributes
dmax = np.iinfo(dmom.dtype).max
dmin = np.iinfo(dmom.dtype).min
minval = dmom.dyn_range_min
maxval = dmom.dyn_range_max
dtype = minval.dtype
gain = (maxval - minval) / (dmax - 1)
offset = minval - gain
gain = gain.astype(dtype)
offset = offset.astype(dtype)
undetect = np.array([dmin])[0].astype(dtype)
# create attribute dict
attrs = collections.OrderedDict()
# clean moment attributes
if standard != "none":
dmom.attrs = collections.OrderedDict()
if standard in ["odim"]:
attrs["gain"] = gain
attrs["offset"] = offset
attrs["nodata"] = undetect
attrs["undetect"] = undetect
# add cfradial moment attributes
if "cf" in standard or mask_and_scale:
attrs["scale_factor"] = gain
attrs["add_offset"] = minval
attrs["_FillValue"] = float(dmax)
if "cf" in standard or decode_coords:
attrs["coordinates"] = "elevation azimuth range"
if "full" in standard:
attrs["_Undetect"] = undetect
if "cf" in standard:
cfname = gamic_mapping[name]
for k, v in moments_mapping[cfname].items():
attrs[k] = v
name = attrs.pop("short_name")
# assign attributes to moment
dmom.attrs.update(attrs)
# keep original dataset name
if standard != "none":
ds = ds.rename({mom: name.upper()})
return ds
def _get_odim_group_moments(nch, sweep, moments=None, **kwargs):
"""Retrieve radar moments from hdf groups.
Parameters
----------
nch : netCDF Dataset handle
sweep : str
sweep key
moments : list
list of moment strings
Returns
-------
ds : dictionary
moment datasets
"""
standard = kwargs.get("standard", "cf-mandatory")
mask_and_scale = kwargs.get("mask_and_scale", True)
decode_coords = kwargs.get("decode_coords", True)
chunks = kwargs.get("chunks", None)
datas = {}
for mom in moments:
dmom_what = _open_dataset(nch[sweep][mom], "what", chunks=chunks)
name = dmom_what.attrs.pop("quantity")
if "cf" in standard and name not in moments_mapping.keys():
continue
dsmom = _open_dataset(nch[sweep], mom, chunks=chunks)
# create attribute dict
attrs = collections.OrderedDict()
if standard in ["odim"]:
attrs.update(dmom_what.attrs)
# add cfradial moment attributes
if "cf" in standard or mask_and_scale:
attrs["scale_factor"] = dmom_what.attrs.get("gain")
attrs["add_offset"] = dmom_what.attrs.get("offset")
attrs["_FillValue"] = dmom_what.attrs.get("nodata")
if "cf" in standard or decode_coords:
attrs["coordinates"] = "elevation azimuth range"
if "cf" in standard:
for k, v in moments_mapping[name].items():
attrs[k] = v
# drop short_name
attrs.pop("short_name")
if "full" in standard:
attrs["_Undetect"] = dmom_what.attrs.get("undetect")
# assign attributes
dmom = dsmom.data.assign_attrs(attrs)
# keep original dataset name
if standard == "none":
name = mom
# fix dimensions
dims = dmom.dims
datas.update({name: dmom.rename({dims[0]: "dim_0", dims[1]: "dim_1"})})
return datas
def _get_odim_groups(ncf, groups, **kwargs):
"""Get hdf groups.
Parameters
----------
ncf : netCDf4 Dataset handle
groups : list
list of groups-keys
Returns
-------
out : tuple
tuple of xarray datasets
"""
return tuple(map(lambda x: _open_dataset(ncf, x, **kwargs), groups))
def _get_odim_moment_names(sweep, fmt=None, src=None):
"""Get moment names.
Parameters
----------
sweep : netCDf4 Group handle
fmt : str
dataset descriptor format
src : str
dataset location
Returns
-------
out : :class:`numpy:numpy.ndarray`
array of moment names
"""
moments = [mom for mom in getattr(sweep, src).keys() if fmt in mom]
moments_idx = np.argsort([int(s[len(fmt) :]) for s in moments])
return np.array(moments)[moments_idx]
def _assign_odim_moments(ds, nch, sweep, **kwargs):
"""Assign radar moments to dataset.
Parameters
----------
nch : netCDF4.Dataset handle
source netCDF4 Dataset
ds : xarray dataset
destination dataset
sweep : str
netcdf group name
Keyword Arguments
-----------------
decode_times : bool
If True, decode cf times to np.datetime64. Defaults to True.
decode_coords : bool
If True, use the ‘coordinates’ attribute on variable
to assign coordinates. Defaults to True.
mask_and_scale : bool
If True, lazily scale (using scale_factor and add_offset)
and mask (using _FillValue). Defaults to True.
georef : bool
If True, adds 2D AEQD x,y,z-coordinates, ground_range (gr) and
2D (rays,bins)-coordinates for easy georeferencing (eg. cartopy)
standard : str
* `none` - data is read as verbatim as possible, no metadata
* `odim` - data is read, odim metadata added to datasets
* `cf-mandatory` - data is read according to cfradial2 standard
importing mandatory metadata
* `cf-full` - data is read according to cfradial2 standard
importing all available cfradial2 metadata (not fully
implemented)
Returns
-------
ds : xarray dataset
Dataset with assigned radar moments
"""
moments = _get_odim_moment_names(nch.nch[sweep], fmt=nch._mfmt, src=nch._msrc)
if nch.flavour == "ODIM":
for name, dmom in _get_odim_group_moments(
nch.nch, sweep, moments=moments, **kwargs
).items():
ds[name] = dmom
if nch.flavour == "GAMIC":
ds = _get_odim_variables_moments(ds, moments=moments, **kwargs)
return ds
def _get_odim_timevalues(nch, grps):
"""Retrieve TimeArray from source data.
Parameters
----------
nch : netCDF4.Dataset handle
source netCDF4 Dataset
grps : dict
Dictionary of dataset hdf5 groups ('how', 'what', 'where')
Returns
-------
timevals : :class:`numpy:numpy.ndarray`
array of time values
"""
if nch.flavour == "ODIM":
try:
timevals = grps["how"].odim.time_range
except (KeyError, AttributeError):
# timehandling if only start and end time is given
start, end = grps["what"].odim.time_range2
if start == end:
warnings.warn(
"WRADLIB: Equal ODIM `starttime` and `endtime` "
"values. Can't determine correct sweep start-, "
"end- and raytimes.",
UserWarning,
)
timevals = np.ones(grps["where"].nrays) * start
else:
delta = (end - start) / grps["where"].nrays
timevals = np.arange(start + delta / 2.0, end, delta)
timevals = np.roll(timevals, shift=-grps["where"].a1gate)
if nch.flavour == "GAMIC":
timevals = grps["what"].gamic.time_range.values
return timevals
def _get_odim_coordinates(nch, grps):
"""Retrieve coordinates according OdimH5 standard.
Parameters
----------
nch : netCDF4.Dataset handle
source netCDF4 Dataset
grps : dict
Dictionary of dataset hdf5 groups ('how', 'what', 'where')
Returns
-------
coords : dict
Dictionary of coordinate arrays
"""
flavour = nch.flavour.lower()
coords = collections.OrderedDict()
if flavour == "odim":
rng = grps["where"]
az = el = grps["how"]
if flavour == "gamic":
az = el = grps["what"]
rng = grps["how"]
try:
coords["azimuth"] = getattr(az, flavour).azimuth_range
coords["elevation"] = getattr(el, flavour).elevation_range
except (KeyError, AttributeError):
az = el = grps["where"]
coords["azimuth"] = getattr(az, flavour).azimuth_range2
coords["elevation"] = getattr(el, flavour).elevation_range2
coords["range"] = getattr(rng, flavour).radial_range
return coords
def _get_odim_sweep_mode(nch, grp):
"""Retrieve sweep mode
Parameters
----------
nch : netCDF4.Dataset handle
grp : dict
Dictionary of dataset hdf5 groups ('how', 'what', 'where')
Returns
-------
out : str
'azimuth_surveillance' or 'rhi'
"""
odim_mode = grp["what"].attrs[nch._swmode]
return "rhi" if odim_mode == "RHI" else "azimuth_surveillance"
def _get_odim_full_vars(nch, grps):
"""Retrieve available non mandatory variables from source data.
Parameters
----------
nch : netCDF4.Dataset handle
grps : dict
Dictionary of dataset hdf5 groups ('how', 'what', 'where')
Returns
-------
full_vars : dict
full cf-variables
"""
full_vars = collections.OrderedDict()
if nch.flavour == "ODIM":
for k, v in cf_full_vars.items():
full_vars[k] = getattr(getattr(grps["how"], nch.flavour.lower()), k)
if nch.flavour == "GAMIC":
pass
return full_vars
def _get_odim_fixed_angle(nch, grps):
"""Retrieve fixed angle from source data.
Parameters
----------
nch : netCDF4.Dataset handle
grps : dict
Dictionary of dataset hdf5 groups ('how', 'what', 'where')
Returns
-------
fixed-angle : float
fixed angle of specific scan
"""
mode = _get_odim_sweep_mode(nch, grps)
if nch.flavour == "ODIM":
ang = {"azimuth_surveillance": "elangle", "rhi": "azangle"}
fixed_angle = getattr(grps["where"], ang[mode])
if nch.flavour == "GAMIC":
ang = {"azimuth_surveillance": "elevation", "rhi": "azimuth"}
fixed_angle = grps["how"].attrs[ang[mode]]
return fixed_angle
def _get_odim_root_attributes(nch, grps):
"""Retrieve root attributes according CfRadial2 standard.
Parameters
----------
grps : dict
Dictionary of root hdf5 groups ('how', 'what', 'where')
Returns
-------
attrs : dict
Dictionary of root attributes
"""
attrs = collections.OrderedDict()
attrs.update(
{
"version": "None",
"title": "None",
"institution": "None",
"references": "None",
"source": "None",
"history": "None",
"comment": "im/exported using wradlib",
"instrument_name": "None",
}
)
attrs["version"] = grps["what"].attrs["version"]
if nch.flavour == "ODIM":
attrs["institution"] = grps["what"].attrs["source"]
attrs["instrument"] = grps["what"].attrs["source"]
if nch.flavour == "GAMIC":
attrs["title"] = grps["how"].attrs["template_name"]
attrs["instrument"] = grps["how"].attrs["host_name"]
return attrs
def _write_odim(src, dest):
"""Writes Odim Attributes.
Parameters
----------
src : dict
Attributes to write
dest : handle
h5py-group handle
"""
for key, value in src.items():
if key in dest.attrs:
continue
if isinstance(value, str):
tid = h5py.h5t.C_S1.copy()
tid.set_size(len(value) + 1)
H5T_C_S1_NEW = h5py.Datatype(tid)
dest.attrs.create(key, value, dtype=H5T_C_S1_NEW)
else:
dest.attrs[key] = value
def _write_odim_dataspace(src, dest):
"""Writes Odim Dataspaces.
Parameters
----------
src : dict
Moments to write
dest : handle
h5py-group handle
"""
keys = [key for key in src if key in ODIM_NAMES]
data_list = [f"data{i + 1}" for i in range(len(keys))]
data_idx = np.argsort(data_list)
for idx in data_idx:
value = src[keys[idx]]
h5_data = dest.create_group(data_list[idx])
enc = value.encoding
# p. 21 ff
h5_what = h5_data.create_group("what")
try:
undetect = float(value._Undetect)
except AttributeError:
undetect = np.finfo(np.float_).max
# set some defaults, if not available
scale_factor = float(enc.get("scale_factor", 1.0))
add_offset = float(enc.get("add_offset", 0.0))
_fillvalue = float(enc.get("_FillValue", undetect))
dtype = enc.get("dtype", value.dtype)
what = {
"quantity": value.name,
"gain": scale_factor,
"offset": add_offset,
"nodata": _fillvalue,
"undetect": undetect,
}
_write_odim(what, h5_what)
# moments handling
val = value.sortby("azimuth").values
fillval = _fillvalue * scale_factor
fillval += add_offset
val = np.where(np.isnan(val), fillval, val)
val = (val - add_offset) / scale_factor
if np.issubdtype(dtype, np.integer):
val = np.rint(val).astype(dtype)
ds = h5_data.create_dataset(
"data",
data=val,
compression="gzip",
compression_opts=6,
fillvalue=_fillvalue,
dtype=dtype,
)
if enc["dtype"] == "uint8":
image = "IMAGE"
version = "1.2"
tid1 = h5py.h5t.C_S1.copy()
tid1.set_size(len(image) + 1)
H5T_C_S1_IMG = h5py.Datatype(tid1)
tid2 = h5py.h5t.C_S1.copy()
tid2.set_size(len(version) + 1)
H5T_C_S1_VER = h5py.Datatype(tid2)
ds.attrs.create("CLASS", image, dtype=H5T_C_S1_IMG)
ds.attrs.create("IMAGE_VERSION", version, dtype=H5T_C_S1_VER)