Source code for wradlib.atten

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Copyright (c) 2011-2023, wradlib developers.
# Distributed under the MIT License. See LICENSE.txt for more info.

"""
Attenuation Correction
^^^^^^^^^^^^^^^^^^^^^^

.. autosummary::
   :nosignatures:
   :toctree: generated/

    {}
"""
__all__ = [
    "AttenuationOverflowError",
    "correct_attenuation_hb",
    "constraint_dbz",
    "constraint_pia",
    "correct_attenuation_constrained",
    "correct_radome_attenuation_empirical",
    "pia_from_kdp",
    "AttenMethods",
]
__doc__ = __doc__.format("\n   ".join(__all__))

import logging
from functools import singledispatch

import numpy as np
from scipy import interpolate, ndimage
from xarray import DataArray, apply_ufunc

from wradlib import trafo, zr
from wradlib.util import XarrayMethods, docstring

logger = logging.getLogger("attcorr")


[docs] class AttenuationOverflowError(Exception): """Attenuation Overflow Error Exception, if attenuation exceeds ``thrs`` and no handling ``mode`` is set. """
[docs] def correct_attenuation_hb( gateset, *, coefficients=None, mode="except", thrs=59.0, ): """Gate-by-Gate attenuation correction according to \ :cite:`Hitschfeld1954` Parameters ---------- gateset : :py:class:`numpy:numpy.ndarray` multidimensional array. The range gates (over which iteration has to be performed) are supposed to vary along the *last* dimension so, e.g., for a set of `l` radar images stored in polar form with `m` azimuths and `n` range-bins the input array's shape can be either (l,m,n) or (m,l,n) data has to be provided in decibel representation of reflectivity [dBZ] coefficients : dict - a : float proportionality factor of the k-Z relation (:math:`k=a \\cdot Z^{b}`). Per default set to 1.67e-4. - b : float exponent of the k-Z relation ( :math:`k=a \\cdot Z^{b}` ). Per default set to 0.7. - gate_length : float length of a range gate [km]. Per default set to 1.0. mode : str controls how the function reacts, if the sum of signal and attenuation exceeds the threshold ``thrs`` Possible values: - 'warn' : emit a warning through the module's logger but continue execution - 'zero' : set offending gates to 0.0 - 'nan' : set offending gates to nan - 'except': raise an AttenuationOverflowError exception Any other mode will also raise the Exception. thrs : float threshold, for the sum of attenuation and signal, which is considered implausible. Returns ------- pia : :py:class:`numpy:numpy.ndarray` Array with the same shape as ``gateset`` containing the calculated attenuation [dB] for each range gate. Raises ------ :exc:`wradlib.atten.AttenuationOverflowError` Examples -------- See :ref:`/notebooks/attenuation/attenuation.ipynb#Hitschfeld-and-Bordan`. """ if coefficients is None: coefficients = {"a": 0.000167, "b": 0.7, "gate_length": 1.0} a = coefficients["a"] b = coefficients["b"] gate_length = coefficients["gate_length"] pia = np.empty(gateset.shape) pia[..., 0] = 0.0 ksum = 0.0 # multidimensional version # assumes that iteration is only along the last dimension # (i.e. range gates) all other dimensions are calculated simultaneously # to gain some speed for gate in range(gateset.shape[-1] - 1): # calculate k in dB/km from k-Z relation # c.f. Krämer2008(p. 147) k = a * (10.0 ** ((gateset[..., gate] + ksum) / 10.0)) ** b * 2.0 * gate_length # k = 10**(log10(a)+0.1*bin*b) # dBkn = 10*math.log10(a) + (bin+ksum)*b + 10*math.log10(2*gate_length) ksum += k pia[..., gate + 1] = ksum # stop-criterion, if corrected reflectivity is larger than 59 dBZ overflow = (gateset[..., gate + 1] + ksum) > thrs if np.any(overflow): if mode == "warn": logger.warning(f"corrected signal over threshold ({thrs:3.1f})") elif mode == "nan": pia[..., gate + 1][overflow] = np.nan elif mode == "zero": pia[..., gate + 1][overflow] = 0.0 else: raise AttenuationOverflowError return pia
[docs] def constraint_dbz(gateset, pia, thrs_dbz): """Constraint callback function for correct_attenuation_constrained. Selects beams, in which at least one pixel exceeds ``thrs_dbz`` [dBZ]. """ return np.max(gateset + pia, axis=-1) > thrs_dbz
[docs] def constraint_pia(gateset, pia, thrs_pia): """Constraint callback function for correct_attenuation_constrained. Selects beams, in which the path integrated attenuation exceeds ``thrs_pia``. """ return np.max(pia, axis=-1) > thrs_pia
def calc_attenuation_forward(gateset, *, a=1.67e-4, b=0.7, gate_length=1.0): """Gate-by-Gate forward correction as described in :cite:`Kraemer2008` Parameters ---------- gateset : :class:`numpy:numpy.ndarray` Multidimensional array, where the range gates (over which iteration has to be performed) are supposed to vary along the last array-dimension. Data has to be provided in decibel representation of reflectivity [dBZ]. a : float proportionality factor of the k-Z relation (:math:`k=a \\cdot Z^{b}`). Per default set to 1.67e-4. b : float exponent of the k-Z relation ( :math:`k=a \\cdot Z^{b}` ). Per default set to 0.7. gate_length : float length of a range gate [km]. Per default set to 1.0. Returns ------- pia : :class:`numpy:numpy.ndarray` Array with the same shape as ``gateset`` containing the calculated path integrated attenuation [dB] for each range gate. """ pia = np.zeros(gateset.shape) for gate in range(gateset.shape[-1] - 1): k = ( a * trafo.idecibel(gateset[..., gate] + pia[..., gate]) ** b * 2.0 * gate_length ) pia[..., gate + 1] = pia[..., gate] + k return pia def bisect_reference_attenuation( gateset, pia_ref, *, a_max=1.67e-4, a_min=2.33e-5, b_start=0.7, gate_length=1.0, mode="difference", thrs=0.25, max_iterations=10, ): """Find the optimal attenuation coefficients for a gateset to achieve a \ given reference attenuation using the forward correction algorithm in \ combination with the bisection method. Parameters ---------- gateset : :class:`numpy:numpy.ndarray` Multidimensional array, where the range gates (over which iteration has to be performed) are supposed to vary along the last array-dimension. Data has to be provided in decibel representation of reflectivity [dBZ]. pia_ref : :class:`numpy:numpy.ndarray` Array of the same number of dimensions as ``gateset``, but the size of the last dimension is 1, as it constitutes the reference pia [dB] of the last range gate of every beam. a_max : float Upper bound of the bisection interval within the linear coefficient ``a`` of the k-Z relation has to be. ( :math:`k=a \\cdot Z^{b}` ). Per default set to 1.67e-4. a_min : float Lower bound of the bisection interval within the linear coefficient ``a`` of the k-Z relation has to be. ( :math:`k=a \\cdot Z^{b}` ). Per default set to 2.33e-5. b_start : float Initial value for exponential coefficient of the k-Z relation ( :math:`k=a \\cdot Z^{b}` ). This value will be lowered incremental by 0.01 if no solution was found within the bisection interval of ``a_max`` and ``a_min`` within the number of given iterations ``max_iterations``. Per default set to 0.7. gate_length : float Radial length of a range gate [km]. Per default set to 1.0. mode : str {‘ratio’ or ‘difference’} Kind of tolerance of calculated pia in relation to reference pia. Per default set to 'difference'. thrs : float Value of the tolerance to stop bisection iteration successful. It is recommended to choose 0.05 for ratio ``mode`` and 0.25 for difference ``mode``, which means a deviation tolerance of 5% or 0.25 dB, respectively. Per default set to 0.25. max_iterations : int Number of bisection iteration before the exponential coefficient b of the k-Z relation will be decreased and the bisection starts again. Per default set to 10. Returns ------- pia : :class:`numpy:numpy.ndarray` Array with the same shape as ``gateset`` containing the calculated path integrated attenuation [dB] for each range gate. a_mid : :class:`numpy:numpy.ndarray` Array with the same shape as ``pia_ref`` containing the finally used linear k-Z relation coefficient a for successful pia calculation. b : :class:`numpy:numpy.ndarray` Array with the same shape as ``pia_ref`` containing the finally used exponential k-Z relation coefficient b for successful pia calculation. """ # Prepare arrays of initial k-Z relation coefficients for each beam. a_hi = np.ones(pia_ref.shape) * a_max # np.repeat(a_max, pia_ref.shape) a_lo = np.ones(pia_ref.shape) * a_min # np.repeat(a_min, pia_ref.shape) b = np.ones(pia_ref.shape) * b_start # np.repeat(b_start, pia_ref.shape) pia = np.empty_like(gateset) iteration_count = 0 # Iterate until upper and lower bounds of linear k-Z relation coefficients # for pia calculation are the same. while not np.all(a_hi == a_lo): a_mid = (a_hi + a_lo) / 2 pia = calc_attenuation_forward(gateset, a=a_mid, b=b, gate_length=gate_length) # Find indices where calculated and reference pia sufficiently match if mode == "difference": overshoot = (pia[..., -1] - pia_ref) > thrs undershoot = (pia[..., -1] - pia_ref) < -thrs hit = (np.abs(pia[..., -1] - pia_ref)) < thrs elif mode == "ratio": overshoot = ((pia[..., -1] - pia_ref) / pia_ref) > thrs undershoot = ((pia[..., -1] - pia_ref) / pia_ref) < -thrs hit = (np.abs(pia[..., -1] - pia_ref) / pia_ref) < thrs else: raise Exception(f"Unknown mode type {mode}.") # Define new bounds of linear k-Z relation coefficient for over- and # undershooting pia calculations. a_hi[overshoot] = a_mid[overshoot] a_lo[undershoot] = a_mid[undershoot] a_hi[hit] = a_mid[hit] a_lo[hit] = a_mid[hit] iteration_count += 1 # Change exponential k-Z relation coefficient in case of maximum # iterations for linear k-Z relation coefficient are reached. if iteration_count > max_iterations: b[overshoot] -= 0.01 b[undershoot] += 0.01 return pia, a_mid, b def _sector_filter(mask, min_sector_size): """Calculate an array of same shape as mask, which is set to 1 in case of \ at least min_sector_size adjacent values, otherwise it is set to 0. """ kernela = np.ones([1] * (mask.ndim - 1) + [min_sector_size]) kernelb = np.ones((min_sector_size,)) forward_origin = -(min_sector_size - (min_sector_size // 2)) + min_sector_size % 2 backward_origin = (min_sector_size - (min_sector_size // 2)) - 1 forward_sum = ndimage.correlate1d( mask.astype(np.int_), kernelb, axis=-1, mode="wrap", origin=forward_origin ) backward_sum = ndimage.correlate1d( mask.astype(np.int_), kernelb, axis=-1, mode="wrap", origin=backward_origin ) forward_corners = forward_sum == min_sector_size backward_corners = backward_sum == min_sector_size forward_large_sectors = np.zeros_like(mask) backward_large_sectors = np.zeros_like(mask) for iii in range(mask.shape[0]): forward_large_sectors[iii] = ndimage.binary_dilation( forward_corners[iii], kernela[0], origin=forward_origin ).astype(int) backward_large_sectors[iii] = ndimage.binary_dilation( backward_corners[iii], kernela[0], origin=backward_origin ).astype(int) return forward_large_sectors | backward_large_sectors def _interp_atten(pia, invalidbeams): """Interpolate reference pia of most distant rangebin of small invalid sectors as a prerequisite for the backward calculation of attenuation. """ # Build a spatial equidistant array for interpolation of the ahead and # behind extended temporary pia-array for handling invalid sectors # overlapping the seam of the radarcircle. x = np.arange(3 * pia.shape[1]) for i in range(pia.shape[0]): sub_invalid = invalidbeams[i, :] sub_pia = pia[i, :, -1] # Build the extended bool-array with the invalid sectors. extended_invalid = np.concatenate([sub_invalid] * 3) # Build the extended pia-array. extended_pia = np.concatenate([sub_pia] * 3) # Build interpolation class. intp = interpolate.interp1d( x[~extended_invalid], extended_pia[~extended_invalid], kind="linear" ) # Interpolate where sectors are invalid. pia[i, sub_invalid, -1] = intp(x[pia.shape[1] : 2 * pia.shape[1]][sub_invalid])
[docs] @singledispatch def correct_attenuation_constrained( gateset, *, a_max=1.67e-4, a_min=2.33e-5, n_a=4, b_max=0.7, b_min=0.65, n_b=6, gate_length=1.0, constraints=None, constraint_args=None, sector_thr=10, ): """Gate-by-Gate attenuation correction based on the iterative approach of \ :cite:`Kraemer2008` and :cite:`Jacobi2016` with a generalized and \ scalable number of constraints. Differing from the original approach, the method for addressing small sectors which conflict with the constraints is based on a bisection forward calculating method, and not on backwards attenuation calculation. Parameters ---------- gateset : :class:`numpy:numpy.ndarray` Multidimensional array, where the range gates (over which iteration has to be performed) are supposed to vary along the last array-dimension and the azimuths are supposed to vary along the next to last array-dimension. Data has to be provided in decibel representation of reflectivity [dBZ]. a_max : float Initial value for linear coefficient of the k-Z relation ( :math:`k=a \\cdot Z^{b}` ). Per default set to 1.67e-4. a_min : float Minimal allowed linear coefficient of the k-Z relation ( :math:`k=a \\cdot Z^{b}` ) in the downwards iteration of 'a' in case of breaching one of thresholds ``constr_args`` of the optional conditions ``constraints``. Per default set to 2.33e-5. n_a : int Number of iterations from ``a_max`` to ``a_min``. Per default set to 4. b_max : float Initial value for exponential coefficient of the k-Z relation ( :math:`k=a \\cdot Z^{b}` ). Per default set to 0.7. b_min : float Minimal allowed exponential coefficient of the k-Z relation ( :math:`k=a \\cdot Z^{b}` ) in the downwards iteration of 'b' in case of breaching one of thresholds ``constr_args`` of the optional conditions ``constraints`` and the linear coefficient 'a' has already reached the lower limit ``a_min``. Per default set to 0.65. n_b : int Number of iterations from ``b_max`` to ``b_min``. Per default set to 6. gate_length : float Radial length of a range gate [km]. Per default set to 1.0. constraints : list List of constraint functions. The signature of these functions has to be constraint_function(`gateset`, `k`, `*constr_args`). Their return value must be a boolean array of shape `gateset.shape[:-1]` set to True for beams, which do not fulfill the constraint. constraint_args : list List of lists, which are to be passed to the individual constraint functions using the `*args` mechanism (len(constr_args) == len(constraints)). sector_thr : int Number of adjacent beams, for which in case of breaching the constraints the attenuation with downward iterated ``a`` and ``b`` - parameters is recalculated. For more narrow sectors the integrated attenuation of the last gate is interpolated and used as reference for the recalculation. Returns ------- pia : :class:`numpy:numpy.ndarray` Array with the same shape as ``gateset`` containing the calculated path integrated attenuation [dB] for each range gate. Examples -------- Implementing the original Hitschfeld & Bordan (1954) algorithm with otherwise default parameters:: from wradlib.atten import * pia = correct_attenuation_constrained(gateset, a_max=8.e-5, b_max=0.731, n_a=1, n_b=1, gate_length=1.0) Implementing the basic Kraemer algorithm:: pia = atten.correct_attenuation_constrained(gateset, a_max=1.67e-4, a_min=2.33e-5, n_a=100, b_max=0.7, b_min=0.65, n_b=6, gate_length=1., constraints= [wrl.atten.constraint_dbz], constraint_args=[[59.0]]) Implementing the PIA algorithm by Jacobi et al.:: pia = atten.correct_attenuation_constrained(gateset, a_max=1.67e-4, a_min=2.33e-5, n_a=100, b_max=0.7, b_min=0.65, n_b=6, gate_length=1., constraints= [wrl.atten.constraint_dbz, wrl.atten.constraint_pia], constraint_args= [[59.0],[20.0]]) """ if constraints is None: constraints = [] if constraint_args is None: constraint_args = [] n_az = gateset.shape[-2] n_rng = gateset.shape[-1] tmp_gateset = gateset.reshape((-1, n_az, n_rng)) pia = np.zeros_like(tmp_gateset) a_used = np.empty(tmp_gateset.shape[:-1]) b_used = np.empty(tmp_gateset.shape[:-1]) # Calculate attenuation forward. # Indexing all rows of last dimension (radarbeams) beams2correct = np.where(np.ones(tmp_gateset.shape[:-1], dtype=np.bool_)) small_sectors = np.zeros(tmp_gateset.shape[:-1], dtype=np.bool_) if n_a != 1: delta_a = (a_max - a_min) / (n_a - 1) else: delta_a = 0.0 if n_b != 1: delta_b = (b_max - b_min) / (n_b - 1) else: delta_b = 0.0 # Iterate over possible b-parameters for j in range(n_b): b = b_max - delta_b * j # Iterate over possible a-parameters for i in range(n_a): a = a_max - delta_a * i # Generate subset of beams that have to be corrected sub_gateset = tmp_gateset[beams2correct] sub_pia = calc_attenuation_forward( sub_gateset, a=a, b=b, gate_length=gate_length ) pia[beams2correct] = sub_pia a_used[beams2correct] = a b_used[beams2correct] = b # Indexing threshold exceeding beams incorrectbeams = np.zeros(tmp_gateset.shape[:-1], dtype=np.bool_) for constraint, constraint_arg in zip(constraints, constraint_args): incorrectbeams = np.logical_or( incorrectbeams, constraint(tmp_gateset, pia, *constraint_arg) ) # Determine incorrect sectors larger than sector_thr large_sectors = _sector_filter(incorrectbeams, sector_thr) # Determine incorrect sectors smaller than sector_thr small_sectors = np.logical_or( small_sectors, (incorrectbeams & ~large_sectors) ) beams2correct = np.where(large_sectors) if len(pia[beams2correct]) == 0: break if len(pia[beams2correct]) == 0: break if np.any(small_sectors): # Interpolate reference pia of most distant # rangebin of invalid sectors. _interp_atten(pia, small_sectors) # Calculate attenuation forward by achieving reference # attenuation based on bisection-method. tmp_pia, tmp_a, tmp_b = bisect_reference_attenuation( tmp_gateset[small_sectors, :], pia[small_sectors, -1], a_max=a_max, a_min=a_min, b_start=b_max, gate_length=gate_length, mode="difference", thrs=0.25, max_iterations=10, ) pia[small_sectors, :] = tmp_pia a_used[small_sectors] = tmp_a b_used[small_sectors] = tmp_b return pia.reshape(gateset.shape)
@correct_attenuation_constrained.register(DataArray) def _correct_attenuation_constrained_xarray(obj, **kwargs): dim0 = obj.wrl.util.dim0() out = apply_ufunc( correct_attenuation_constrained, obj, input_core_dims=[[dim0, "range"]], output_core_dims=[[dim0, "range"]], dask="parallelized", kwargs=kwargs, dask_gufunc_kwargs=dict(allow_rechunk=True), ) out.name = "correct_attenuation_constrained" return out
[docs] def correct_radome_attenuation_empirical( gateset, *, frequency=5.64, hydrophobicity=0.165, n_r=2, stat=np.mean ): """Estimate two-way wet radome losses. Empirical function of frequency and rainfall rate for both standard and hydrophobic radomes based on the approach of :cite:`Merceret2000`. Parameters ---------- gateset : :class:`numpy:numpy.ndarray` Multidimensional array, where the range gates (over which iteration has to be performed) are supposed to vary along the last array-dimension and the azimuths are supposed to vary along the next to last array-dimension. Data has to be provided in decibel representation of reflectivity [dBZ]. frequency : float Radar-frequency [GHz]: Standard frequencies in X-band range between 8.0 and 12.0 GHz, Standard frequencies in C-band range between 4.0 and 8.0 GHz, Standard frequencies in S-band range between 2.0 and 4.0 GHz. Be aware that the empirical fit of the formula was just done for C- and S-band. The use for X-band is probably an undue extrapolation. Per default set to 5.64 as used by the German Weather Service radars. hydrophobicity : float Empirical parameter based on the hydrophobicity of the radome material. - 0.165 for standard radomes, - 0.0575 for hydrophobic radomes. Per default set to 0.165. n_r : int The radius of rangebins within the rain-intensity is statistically evaluated as the representative rain-intensity over radome. stat : callable A numpy function for statistical aggregation of the central rangebins defined by n_r. Potential options: :func:`numpy:numpy.mean`, :func:`numpy:numpy.median`, :func:`numpy:numpy.max`, :func:`numpy:numpy.min`. Returns ------- k : :class:`numpy:numpy.ndarray` Array with the same shape as ``gateset`` containing the calculated two-way transmission loss [dB] for each range gate. In case the input array (gateset) contains NaNs the corresponding beams of the output array (k) will be set as NaN, too. """ # Select rangebins inside the defined center-range n_r. center = gateset[..., :n_r].reshape(-1, n_r * gateset.shape[-2]) center_m = np.ma.masked_array(center, np.isnan(center)) # Calculate rainrate in the center-range based on statistical method stat # and with standard ZR-relation. rain_over_radome = zr.z_to_r(trafo.idecibel(stat(center_m, axis=-1))) # Estimate the empirical two-way transmission loss due to # radome-attenuation. k = 2 * hydrophobicity * rain_over_radome * np.tanh(frequency / 10.0) ** 2 # Reshape the result to gateset-shape. k = np.repeat(k, gateset.shape[-1] * gateset.shape[-2]).reshape(gateset.shape) return k.filled(fill_value=np.nan)
[docs] def pia_from_kdp(kdp, dr, *, gamma=0.08): """Retrieving path integrated attenuation from specific differential \ phase (Kdp). The default value of gamma is based on :cite:`Carey2000`. Parameters ---------- kdp : :class:`numpy:numpy.ndarray` array specific differential phase Range dimension must be the last dimension. dr : float gate length (km) gamma : float linear coefficient (default value: 0.08) in the relation between Kdp phase and specific attenuation (alpha) Returns ------- output : :class:`numpy:numpy.ndarray` array of same shape as kdp containing the path integrated attenuation """ alpha = gamma * kdp return 2 * np.cumsum(alpha, axis=-1) * dr
[docs] class AttenMethods(XarrayMethods): """wradlib xarray SubAccessor methods for Atten Methods."""
[docs] @docstring(correct_attenuation_constrained) def correct_attenuation_constrained(self, *args, **kwargs): if not isinstance(self, AttenMethods): return correct_attenuation_constrained(self, *args, **kwargs) else: return correct_attenuation_constrained(self._obj, *args, **kwargs)
if __name__ == "__main__": print("wradlib: Calling module <atten> as main...")