#!/usr/bin/env python
# Copyright (c) 2011-2023, wradlib developers.
# Distributed under the MIT License. See LICENSE.txt for more info.
"""
HDF Data I/O
^^^^^^^^^^^^
Former available xarray based code has been ported to `xradar <https://xradar.rtfd.io>`__-package.
.. autosummary::
:nosignatures:
:toctree: generated/
{}
"""
__all__ = [
"open_gpm_dataset",
"read_generic_hdf5",
"read_opera_hdf5",
"read_gamic_hdf5",
"to_hdf5",
"from_hdf5",
"read_gpm",
"read_trmm",
]
__doc__ = __doc__.format("\n ".join(__all__))
import datetime as dt
import warnings
import numpy as np
import xarray as xr
from packaging.version import Version
from wradlib.util import import_optional
h5py = import_optional("h5py")
h5netcdf = import_optional("h5netcdf")
nc = import_optional("netCDF4")
[docs]
def read_generic_hdf5(fname):
"""Reads hdf5 files according to their structure
In contrast to other file readers under :meth:`wradlib.io`, this function
will *not* return a two item tuple with (data, metadata). Instead, this
function returns ONE dictionary that contains all the file contents - both
data and metadata. The keys of the output dictionary conform to the
Group/Subgroup directory branches of the original file.
Parameters
----------
fname : str or file-like
a hdf5 file path or file-like object
Returns
-------
output : dict
a dictionary that contains both data and metadata according to the
original hdf5 file structure
Examples
--------
See :ref:`/notebooks/fileio/legacy/read_hdf5.ipynb`.
"""
fcontent = {}
def filldict(x, y):
# create a new container
tmp = {}
# add attributes if present
if len(y.attrs) > 0:
tmp["attrs"] = dict(y.attrs)
# add data if it is a dataset
if isinstance(y, h5py.Dataset):
# silence h5py/numpy warning
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
category=DeprecationWarning,
message="`product` is deprecated",
)
tmp["data"] = np.array(y)
# only add to the dictionary, if we have something meaningful to add
if tmp != {}:
fcontent[x] = tmp
with h5py.File(fname, "r") as f:
f.visititems(filldict)
return fcontent
[docs]
def read_opera_hdf5(fname):
"""Reads hdf5 files according to OPERA conventions
Please refer to the OPERA data model documentation :cite:`OPERA-data-model`
in order to understand how an hdf5 file is organized that conforms to the
OPERA ODIM_H5 conventions.
In contrast to other file readers under :meth:`wradlib.io`, this function
will *not* return a two item tuple with (data, metadata). Instead, this
function returns ONE dictionary that contains all the file contents - both
data and metadata. The keys of the output dictionary conform to the
Group/Subgroup directory branches of the original file.
If the end member of a branch (or path) is "data", then the corresponding
item of output dictionary is a numpy array with actual data.
Any other end member (either *how*, *where*,
and *what*) will contain the meta information applying to the corresponding
level of the file hierarchy.
Parameters
----------
fname : str or file-like
a hdf5 file path or file-like object
Returns
-------
output : dict
a dictionary that contains both data and metadata according to the
original hdf5 file structure
"""
# now we browse through all Groups and Datasets
# and store the info in one dictionary
fcontent = {}
def filldict(x, y):
if isinstance(y, h5py.Group):
if len(y.attrs) > 0:
fcontent[x] = dict(y.attrs)
elif isinstance(y, h5py.Dataset):
# silence h5py/numpy warning
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
category=DeprecationWarning,
message="`product` is deprecated",
)
fcontent[x] = np.array(y)
with h5py.File(fname, "r") as f:
f.visititems(filldict)
return fcontent
def read_gamic_scan_attributes(scan, scan_type):
"""Read attributes from one particular scan from a GAMIC hdf5 file
Parameters
----------
scan : object
scan object from hdf5 file
scan_type : str
"PVOL" (plan position indicator) or "RHI" (range height indicator)
Returns
-------
sattrs : dict
dictionary of scan attributes
"""
# placeholder for attributes
sattrs = {}
# link to scans 'how' hdf5 group
sg1 = scan["how"]
# get scan attributes
for attrname in list(sg1.attrs):
sattrs[attrname] = sg1.attrs.get(attrname)
sattrs["bin_range"] = sattrs["range_step"] * sattrs["range_samples"]
# get scan header
ray_header = scan["ray_header"]
# az, el, zero_index for PPI scans
if scan_type == "PVOL":
azi_start = ray_header["azimuth_start"]
azi_stop = ray_header["azimuth_stop"]
# Azimuth corresponding to 1st ray
zero_index = np.where(azi_stop < azi_start)
azi_stop[zero_index[0]] += 360
zero_index = zero_index[0] + 1
az = (azi_start + azi_stop) / 2
az = np.roll(az, -zero_index, axis=0)
az = np.round(az, 1)
el = sg1.attrs.get("elevation")
# az, el, zero_index for RHI scans
if scan_type == "RHI":
ele_start = np.round(ray_header["elevation_start"], 1)
ele_stop = np.round(ray_header["elevation_stop"], 1)
angle_step = np.round(sattrs["angle_step"], 1)
angle_step = int(np.round(sattrs["ele_stop"], 1) / angle_step)
# Elevation corresponding to 1st ray
if ele_start[0] < 0:
ele_start = ele_start[1:]
ele_stop = ele_stop[1:]
zero_index = np.where(ele_stop > ele_start)
zero_index = zero_index[0] # - 1
el = (ele_start + ele_stop) / 2
el = np.round(el, 1)
el = el[-angle_step:]
az = sg1.attrs.get("azimuth")
# save zero_index (first ray) to scan attributes
sattrs["zero_index"] = zero_index[0]
# create range array
r = np.arange(
sattrs["bin_range"],
sattrs["bin_range"] * sattrs["bin_count"] + sattrs["bin_range"],
sattrs["bin_range"],
)
# save variables to scan attributes
sattrs["az"] = az
sattrs["el"] = el
sattrs["r"] = r
sattrs["Time"] = sattrs.pop("timestamp")
sattrs["max_range"] = r[-1]
return sattrs
def read_gamic_scan(scan, scan_type, wanted_moments):
"""Read data from one particular scan from GAMIC hdf5 file
Parameters
----------
scan : object
scan object from hdf5 file
scan_type : str
"PVOL" (plan position indicator) or "RHI" (range height indicator)
wanted_moments : sequence
sequence of strings containing upper case names of moment(s) to
be returned
Returns
-------
data : dict
dictionary of moment data (numpy arrays)
sattrs : dict
dictionary of scan attributes
"""
# placeholder for data and attrs
data = {}
sattrs = {}
# try to read wanted moments
for mom in list(scan):
if "moment" in mom:
data1 = {}
sg2 = scan[mom]
actual_moment = sg2.attrs.get("moment")
if Version(h5py.__version__) < Version("3.0.0"):
actual_moment = actual_moment.decode()
actual_moment = actual_moment.upper()
if (actual_moment in wanted_moments) or (wanted_moments == "all"):
# read attributes only once
if not sattrs:
sattrs = read_gamic_scan_attributes(scan, scan_type)
mdata = sg2[...]
dyn_range_max = sg2.attrs.get("dyn_range_max")
dyn_range_min = sg2.attrs.get("dyn_range_min")
bin_format = sg2.attrs.get("format")
if Version(h5py.__version__) < Version("3.0.0"):
bin_format = bin_format.decode()
if bin_format == "UV8":
div = 254
else:
div = 65534
mdata = (
dyn_range_min + (mdata - 1) * (dyn_range_max - dyn_range_min) / div
)
if scan_type == "PVOL":
# rotate accordingly
mdata = np.roll(mdata, -1 * sattrs["zero_index"], axis=0)
if scan_type == "RHI":
# remove first zero angles
sdiff = mdata.shape[0] - sattrs["el"].shape[0]
mdata = mdata[sdiff:, :]
data1["data"] = mdata
data1["dyn_range_max"] = dyn_range_max
data1["dyn_range_min"] = dyn_range_min
data[actual_moment] = data1
return data, sattrs
[docs]
def read_gamic_hdf5(filename, *, wanted_elevations=None, wanted_moments=None):
"""Data reader for hdf5 files produced by the commercial \
GAMIC Enigma V3 MURAN software
See GAMIC homepage for further info (https://www.gamic.com).
Parameters
----------
filename : str or file-like
path of the gamic hdf5 file or file-like object
wanted_elevations : sequence
sequence of strings of elevation_angle(s) of scan (only needed for PPI)
wanted_moments : sequence
sequence of strings of moment name(s)
Returns
-------
data : dict
dictionary of scan and moment data (numpy arrays)
attrs : dict
dictionary of attributes
Examples
--------
See :ref:`/notebooks/fileio/legacy/read_gamic.ipynb`.
"""
# check elevations
if wanted_elevations is None:
wanted_elevations = "all"
# check wanted_moments
if wanted_moments is None:
wanted_moments = "all"
# read the data from file
with h5py.File(filename, "r") as f:
# placeholder for attributes and data
attrs = {}
vattrs = {}
data = {}
# check if GAMIC file and
try:
f["how"].attrs.get("software")
except KeyError as err:
raise OSError("File {filename} is no GAMIC hdf5 type!") from err
# get scan_type (PVOL or RHI)
scan_type = f["what"].attrs.get("object")
if Version(h5py.__version__) < Version("3.0.0"):
scan_type = scan_type.decode()
# single or volume scan
if scan_type == "PVOL":
# loop over 'main' hdf5 groups (how, scanX, what, where)
for n in list(f):
if "scan" in n:
g = f[n]
sg1 = g["how"]
# get scan elevation
el = sg1.attrs.get("elevation")
el = str(round(el, 2))
# try to read scan data and attrs
# if wanted_elevations are found
if (el in wanted_elevations) or (wanted_elevations == "all"):
sdata, sattrs = read_gamic_scan(
scan=g, scan_type=scan_type, wanted_moments=wanted_moments
) # noqa
if sdata:
data[n.upper()] = sdata
if sattrs:
attrs[n.upper()] = sattrs
# single rhi scan
elif scan_type == "RHI":
# loop over 'main' hdf5 groups (how, scanX, what, where)
for n in list(f):
if "scan" in n:
g = f[n]
# try to read scan data and attrs
sdata, sattrs = read_gamic_scan(
scan=g, scan_type=scan_type, wanted_moments=wanted_moments
)
if sdata:
data[n.upper()] = sdata
if sattrs:
attrs[n.upper()] = sattrs
# collect volume attributes if wanted data is available
if data:
vattrs["Latitude"] = f["where"].attrs.get("lat")
vattrs["Longitude"] = f["where"].attrs.get("lon")
vattrs["Height"] = f["where"].attrs.get("height")
attrs["VOL"] = vattrs
return data, attrs
[docs]
def to_hdf5(
fpath, data, *, mode="w", metadata=None, dataset="data", compression="gzip"
):
"""Quick storage of one <data> array and a <metadata> dict in an hdf5 file
This is more efficient than pickle, cPickle or numpy.save. The data is
stored in a subgroup named `data` (i.e. file["data"]).
See :func:`~wradlib.io.hdf.from_hdf5` for retrieving stored data.
Parameters
----------
fpath : str or file-like
path to the hdf5 file or file-like object
data : :py:class:`numpy:numpy.ndarray`
mode : str
file open mode, defaults to "w" (create, truncate if exists)
metadata : dict
dictionary of data's attributes
dataset : str
describing dataset
compression : str
h5py-compression type {"gzip"|"szip"|"lzf"}, see h5py-documentation
for details
"""
with h5py.File(fpath, mode=mode) as f:
dset = f.create_dataset(dataset, data=data, compression=compression)
# store metadata
if metadata:
for key in metadata.keys():
dset.attrs[key] = metadata[key]
[docs]
def from_hdf5(fpath, *, dataset="data"):
"""Loading data from hdf5 files that was stored by \
:func:`~wradlib.io.hdf.to_hdf5`
Parameters
----------
fpath : str or file-like
path to the hdf5 file or file-like object
dataset : str
name of the Dataset in which the data is stored
"""
with h5py.File(fpath, mode="r") as f:
# Check whether Dataset exists
if dataset not in f.keys():
raise KeyError(f"Cannot read Dataset {dataset!r} from hdf5 file {f!r}")
data = np.array(f[dataset][:])
# get metadata
metadata = {}
for key in f[dataset].attrs.keys():
metadata[key] = f[dataset].attrs[key]
return data, metadata
[docs]
def read_gpm(filename, *, bbox=None):
"""Reads GPM files for matching with GR
Parameters
----------
filename : str
path of the GPM file
bbox : dict
dictionary with bounding box coordinates (lon, lat),
defaults to None
Returns
-------
gpm_data : dict
dictionary of gpm data
Examples
--------
See :ref:`/notebooks/workflow/recipe3.ipynb`.
"""
pr_data = nc.Dataset(filename, mode="r")
lon = pr_data["NS"].variables["Longitude"]
lat = pr_data["NS"].variables["Latitude"]
if bbox is not None:
poly = [
[bbox["left"], bbox["bottom"]],
[bbox["left"], bbox["top"]],
[bbox["right"], bbox["top"]],
[bbox["right"], bbox["bottom"]],
[bbox["left"], bbox["bottom"]],
]
from wradlib.zonalstats import get_clip_mask
mask = get_clip_mask(np.dstack((lon[:], lat[:])), poly)
else:
mask = np.ones_like(lon, dtype=bool, subok=False)
mask = np.nonzero(np.count_nonzero(mask, axis=1))
lon = lon[mask]
lat = lat[mask]
year = pr_data["NS"]["ScanTime"].variables["Year"][mask]
month = pr_data["NS"]["ScanTime"].variables["Month"][mask]
dayofmonth = pr_data["NS"]["ScanTime"].variables["DayOfMonth"][mask]
hour = pr_data["NS"]["ScanTime"].variables["Hour"][mask]
minute = pr_data["NS"]["ScanTime"].variables["Minute"][mask]
second = pr_data["NS"]["ScanTime"].variables["Second"][mask]
millisecond = pr_data["NS"]["ScanTime"].variables["MilliSecond"][mask]
date_array = zip(
year,
month,
dayofmonth,
hour,
minute,
second,
millisecond.astype(np.int32) * 1000,
)
pr_time = np.array(
[dt.datetime(d[0], d[1], d[2], d[3], d[4], d[5], d[6]) for d in date_array]
)
sfc = pr_data["NS"]["PRE"].variables["landSurfaceType"][mask]
pflag = pr_data["NS"]["PRE"].variables["flagPrecip"][mask]
zbb = pr_data["NS"]["CSF"].variables["heightBB"][mask]
bbwidth = pr_data["NS"]["CSF"].variables["widthBB"][mask]
qbb = pr_data["NS"]["CSF"].variables["qualityBB"][mask]
qtype = pr_data["NS"]["CSF"].variables["qualityTypePrecip"][mask]
ptype = pr_data["NS"]["CSF"].variables["typePrecip"][mask]
quality = pr_data["NS"]["scanStatus"].variables["dataQuality"][mask]
refl = pr_data["NS"]["SLV"].variables["zFactorCorrected"][mask]
zenith = pr_data["NS"]["PRE"].variables["localZenithAngle"][mask]
pr_data.close()
# Check for bad data
if max(quality) != 0:
raise ValueError("GPM contains Bad Data")
pflag = pflag.astype(np.int8)
# Determine the dimensions
ndim = refl.ndim
if ndim != 3:
raise ValueError(f"GPM Dimensions do not match! Needed 3, given {ndim}")
tmp = refl.shape
nscan = tmp[0]
nray = tmp[1]
nbin = tmp[2]
# Reverse direction along the beam
refl = np.flip(refl, axis=-1)
# Change pflag=1 to pflag=2 to be consistent with 'Rain certain' in TRMM
pflag[pflag == 1] = 2
# Simplify the precipitation types
ptype = (ptype / 1e7).astype(np.int16)
# Simplify the surface types
imiss = sfc == -9999
sfc = (sfc / 1e2).astype(np.int16) + 1
sfc[imiss] = 0
# Set a quality indicator for the BB and precip type data
# TODO: Why is the `quality` variable overwritten?
quality = np.zeros((nscan, nray), dtype=np.uint8)
i1 = ((qbb == 0) | (qbb == 1)) & (qtype == 1)
quality[i1] = 1
i2 = (qbb > 1) | (qtype > 2)
quality[i2] = 2
gpm_data = {}
gpm_data.update(
{
"nscan": nscan,
"nray": nray,
"nbin": nbin,
"date": pr_time,
"lon": lon,
"lat": lat,
"pflag": pflag,
"ptype": ptype,
"zbb": zbb,
"bbwidth": bbwidth,
"sfc": sfc,
"quality": quality,
"refl": refl,
"zenith": zenith,
}
)
return gpm_data
def _get_gpm_group(filename, group, *, variables=None):
"""Return group as xarrax.Dataset from GPM file."""
ds = xr.open_dataset(
filename,
group=group,
decode_cf=False,
engine="h5netcdf",
backend_kwargs=dict(phony_dims="sort"),
)
for n, v in ds.items():
dimnames = v.attrs.get("DimensionNames", False)
if dimnames:
vdims = v.attrs["DimensionNames"].split(",")
dims = {dim: vdims[i] for i, dim in enumerate(v.dims)}
ds[n] = v.swap_dims(dims)
if variables is not None:
keep = set(variables)
ds = ds.drop_vars(set(ds.variables) ^ keep)
if isinstance(variables, dict):
ds = ds.rename(variables)
return ds
def _get_gpm_time_group(filename, group):
"""Return time subgroup as xarrax.Dataset from GPM file."""
variables = [
"Year",
"Month",
"DayOfMonth",
"Hour",
"Minute",
"Second",
"MilliSecond",
]
ds = _get_gpm_group(filename, group=group, variables=variables)
date_array = zip(
ds.Year.values,
ds.Month.values,
ds.DayOfMonth.values,
ds.Hour.values,
ds.Minute.values,
ds.Second.values,
ds.MilliSecond.values,
)
pr_time = np.array(
[dt.datetime(d[0], d[1], d[2], d[3], d[4], d[5], d[6]) for d in date_array]
)
ds = ds.assign_coords({"date": (["nscan"], pr_time)})
ds = ds.drop_vars(set(ds.variables) ^ set(["date"]))
return ds
[docs]
def open_gpm_dataset(filename, group):
"""Reads GPM files version `V07A`.
Parameters
----------
filename : str
path of the GPM file
group : str
name of group
Returns
-------
ds : xarray.Dataset
xarray.Dataset representation of GPM file with requested `group`.
"""
parents = group.split("/")[:-1]
with h5netcdf.File(filename, mode="r", decode_vlen_strings=True) as f:
grps = list(f[group].groups)
root = _get_gpm_group(filename, group="/")
root = root.rename_dims({list(root.dims)[0]: "nswath"})
groups = {"root": root}
if parents:
for p in parents:
pds = _get_gpm_group(filename, group=f"/{p}")
groups[p] = pds
subroot = _get_gpm_group(filename, group=f"/{group}")
groups["subroot"] = subroot
for grp in grps:
gname = "/".join(["/" + group, grp])
if grp == "ScanTime":
groups[grp] = _get_gpm_time_group(filename, group=gname)
else:
groups[grp] = _get_gpm_group(filename, group=gname)
ds = xr.merge(groups.values())
return ds
[docs]
def read_trmm(filename1, filename2, *, bbox=None):
"""Reads TRMM files for matching with GR
Parameters
----------
filename1 : str
path of the TRMM 2A23 file
filename2 : str
path of the TRMM 2A25 file
bbox : dict
dictionary with bounding box coordinates (lon, lat),
defaults to None
Returns
-------
trmm_data : dict
dictionary of trmm data
Examples
--------
See :ref:`/notebooks/workflow/recipe3.ipynb`.
"""
# trmm 2A23 and 2A25 data is hdf4
pr_data1 = nc.Dataset(filename1, mode="r")
pr_data2 = nc.Dataset(filename2, mode="r")
lon = pr_data1.variables["Longitude"]
lat = pr_data1.variables["Latitude"]
if bbox is not None:
poly = [
[bbox["left"], bbox["bottom"]],
[bbox["left"], bbox["top"]],
[bbox["right"], bbox["top"]],
[bbox["right"], bbox["bottom"]],
[bbox["left"], bbox["bottom"]],
]
from wradlib.zonalstats import get_clip_mask
mask = get_clip_mask(np.dstack((lon[:], lat[:])), poly)
else:
mask = np.ones_like(lon, dtype=bool)
mask = np.nonzero(np.count_nonzero(mask, axis=1))
lon = pr_data1.variables["Longitude"][mask]
lat = pr_data1.variables["Latitude"][mask]
year = pr_data1.variables["Year"][mask]
month = pr_data1.variables["Month"][mask]
dayofmonth = pr_data1.variables["DayOfMonth"][mask]
hour = pr_data1.variables["Hour"][mask]
minute = pr_data1.variables["Minute"][mask]
second = pr_data1.variables["Second"][mask]
millisecond = pr_data1.variables["MilliSecond"][mask]
date_array = zip(
year,
month,
dayofmonth,
hour,
minute,
second,
millisecond.astype(np.int32) * 1000,
)
pr_time = np.array(
[dt.datetime(d[0], d[1], d[2], d[3], d[4], d[5], d[6]) for d in date_array]
)
pflag = pr_data1.variables["rainFlag"][mask]
ptype = pr_data1.variables["rainType"][mask]
status = pr_data1.variables["status"][mask]
zbb = pr_data1.variables["HBB"][mask].astype(np.float32)
bbwidth = pr_data1.variables["BBwidth"][mask].astype(np.float32)
quality = pr_data2.variables["dataQuality"][mask]
refl = pr_data2.variables["correctZFactor"][mask] / 100.0
zenith = pr_data2.variables["scLocalZenith"][mask]
pr_data1.close()
pr_data2.close()
# mask array
refl = np.ma.array(refl)
# Ground clutter
refl[refl == -8888.0] = np.ma.masked
# Misssing data
refl[refl == -9999.0] = np.ma.masked
# Scaling
refl /= 100.0
# Check for bad data
if max(quality) != 0:
raise ValueError("TRMM contains Bad Data")
# Determine the dimensions
ndim = refl.ndim
if ndim != 3:
raise ValueError(f"TRMM Dimensions do not match! Needed 3, given {ndim}")
tmp = refl.shape
nscan = tmp[0]
nray = tmp[1]
nbin = tmp[2]
# Reverse direction along the beam
refl = np.flip(refl, axis=-1)
# Simplify the precipitation flag
ipos = (pflag >= 10) & (pflag <= 20)
icer = pflag >= 20
pflag[ipos] = 1
pflag[icer] = 2
# Simplify the precipitation types
istr = (ptype >= 100) & (ptype <= 200)
icon = (ptype >= 200) & (ptype <= 300)
ioth = ptype >= 300
inone = ptype == -88
imiss = ptype == -99
ptype[istr] = 1
ptype[icon] = 2
ptype[ioth] = 3
ptype[inone] = 0
ptype[imiss] = -1
# Extract the surface type
sfc = np.zeros((nscan, nray), dtype=np.uint8)
i0 = status == 168
sfc[i0] = 0
i1 = status % 10 == 0
sfc[i1] = 1
i2 = (status - 1) % 10 == 0
sfc[i2] = 2
i3 = (status - 3) % 10 == 0
sfc[i3] = 3
i4 = (status - 4) % 10 == 0
sfc[i4] = 4
i5 = (status - 5) % 10 == 0
sfc[i5] = 5
i9 = (status - 9) % 10 == 0
sfc[i9] = 9
# Extract 2A23 quality
# TODO: Why is the `quality` variable overwritten?
quality = np.zeros((nscan, nray), dtype=np.uint8)
i0 = status == 168
quality[i0] = 0
i1 = status < 50
quality[i1] = 1
i2 = (status >= 50) & (status < 109)
quality[i2] = 2
trmm_data = {}
trmm_data.update(
{
"nscan": nscan,
"nray": nray,
"nbin": nbin,
"date": pr_time,
"lon": lon,
"lat": lat,
"pflag": pflag,
"ptype": ptype,
"zbb": zbb,
"bbwidth": bbwidth,
"sfc": sfc,
"quality": quality,
"refl": refl,
"zenith": zenith,
}
)
return trmm_data