#!/usr/bin/env python
# Copyright (c) 2011-2023, wradlib developers.
# Distributed under the MIT License. See LICENSE.txt for more info.
"""
Interpolation
^^^^^^^^^^^^^
Interpolation allows to transfer data from one set of locations to another.
This includes for example:
- interpolating the data from a polar grid to a cartesian grid or irregular
points
- interpolating point observations to a grid or a set of irregular points
- filling missing values, e.g. filling clutters
.. autosummary::
:nosignatures:
:toctree: generated/
{}
"""
__all__ = [
"IpolBase",
"Nearest",
"Idw",
"Linear",
"OrdinaryKriging",
"ExternalDriftKriging",
"RectGrid",
"RectBin",
"QuadriArea",
"interpolate",
"interpolate_polar",
"cart_to_irregular_interp",
"cart_to_irregular_spline",
"IpolMethods",
]
__doc__ = __doc__.format("\n ".join(__all__))
import re
from functools import reduce, singledispatch
import numpy as np
import scipy
from packaging.version import Version
from scipy import interpolate as sinterp
from scipy import ndimage, spatial, special, stats
# from xarray import DataArray, apply_ufunc
from wradlib import georef, util, zonalstats
from wradlib.util import XarrayMethods, docstring
xr = util.import_optional("xarray")
class MissingSourcesError(Exception):
"""Is raised in case no source coordinates are available for interpolation."""
class MissingTargetsError(Exception):
"""Is raised in case no interpolation targets are available."""
[docs]
class IpolBase:
"""
IpolBase(src, trg)
The base class for interpolation in N dimensions.
Provides the basic interface for all other classes.
Parameters
----------
src : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the source points.
trg : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the target points.
"""
def __init__(self, src, trg, **kwargs):
src = self._make_coord_arrays(src)
trg = self._make_coord_arrays(trg)
self.numsources = len(src)
self.numtargets = len(trg)
[docs]
def __call__(self, vals):
"""
Evaluate interpolator for values given at the source points.
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
ndarray of float, shape (numsources, ...)
Values at the source points which to interpolate
Returns
-------
output : None
"""
self._check_shape(vals)
return None
def _check_shape(self, vals):
"""
Checks whether the values correspond to the source points
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
ndarray of float
"""
if len(vals) != self.numsources:
raise ValueError(
f"Length of value array {len(vals)} does not correspond to number "
f"of source points {self.numsources}"
)
self.valsshape = vals.shape
self.valsndim = vals.ndim
def _make_coord_arrays(self, x):
"""
Make sure that the coordinates are provided as ndarray
of shape (numpoints, ndim)
Parameters
----------
x : :class:`numpy:numpy.ndarray`
ndarray of float with shape (numpoints, ndim)
OR a sequence of ndarrays of float with len(sequence)==ndim and
the length of the ndarray corresponding to the number of points
"""
if type(x) in [list, tuple]:
x = [item.ravel() for item in x]
x = np.array(x).transpose()
elif isinstance(x, np.ndarray):
if x.ndim == 1:
x = x.reshape(-1, 1)
elif x.ndim == 2:
pass
else:
raise TypeError("Cannot deal wih 3-d arrays, yet.")
return x
def _make_2d(self, vals):
"""Reshape increase number of dimensions of vals if smaller than 2,
appending additional dimensions (as opposed to the atleast_nd methods
of numpy).
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
values who are to be reshaped to the right shape
Returns
-------
output : :class:`numpy:numpy.ndarray`
if vals.shape==() [a scalar] output.shape will be (1, 1)
if vals.shape==(npt, ) output.shape will be (npt, 1)
if vals.ndim > 1 vals will be returned as is
"""
if vals.ndim < 2:
# ndmin might be 0, so we get it to 1-d first
# then we add an axis as we assume that
return np.atleast_1d(vals)[:, np.newaxis]
else:
return vals
[docs]
class Nearest(IpolBase):
"""
Nearest(src, trg)
Nearest-neighbour interpolation in N dimensions.
Parameters
----------
src : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims) or cKDTree object
Data point coordinates of the source points.
trg : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the target points.
remove_missing : int
Number of neighbours to consider in the presence of NaN, defaults to 0.
Keyword Arguments
-----------------
**kwargs : dict
keyword arguments of ipclass (see class documentation)
Examples
--------
See :ref:`/notebooks/interpolation/interpolation.ipynb`.
Note
----
Uses :class:`scipy:scipy.spatial.cKDTree`
"""
def __init__(self, src, trg, *, remove_missing=0, **kwargs):
if isinstance(src, spatial.cKDTree):
self.tree = src
else:
src = self._make_coord_arrays(src)
if len(src) == 0:
raise MissingSourcesError
# plant a tree, use unbalanced tree as default
kwargs.update(balanced_tree=kwargs.pop("balanced_tree", False))
self.tree = spatial.cKDTree(src, **kwargs)
self.numsources = self.tree.n
trg = self._make_coord_arrays(trg)
self.numtargets = len(trg)
if self.numtargets == 0:
raise MissingTargetsError
self.nnearest = remove_missing + 1
# query tree
self.dists, self.ix = self.tree.query(trg, k=self.nnearest)
# avoid bug, if there is only one neighbor at all
if self.dists.ndim == 1:
self.dists = self.dists[:, np.newaxis]
self.ix = self.ix[:, np.newaxis]
[docs]
def __call__(self, vals, *, maxdist=None):
"""
Evaluate interpolator for values given at the source points.
You can interpolate multiple datasets of source values (``vals``) at
once: the ``vals`` array should have the shape (number of source
points, number of source datasets). If you want to interpolate only one
set of source values, ``vals`` can have the shape (number of source
points, 1) or just (number of source points, ) - which is a flat/1-D
array. The output will have the same number of dimensions as ``vals``,
i.e. it will be a flat 1-D array in case ``vals`` is a 1-D array.
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
ndarray of float, shape (numsourcepoints, ...)
Values at the source points which to interpolate
maxdist : float
the maximum distance up to which an interpolated values is
assigned - if maxdist is exceeded, np.nan will be assigned
If maxdist==None, values will be assigned everywhere
Returns
-------
output : :class:`numpy:numpy.ndarray`
ndarray of float with shape (numtargetpoints,...)
"""
self._check_shape(vals)
# get first neighbour
trgvals = vals[self.ix[:, 0]]
dists = self.dists[..., 0].copy()
# iteratively fill NaN with next neighbours
isnan = np.isnan(trgvals)
nanidx = np.argwhere(isnan)[..., 0]
if self.nnearest > 1 & np.count_nonzero(isnan):
for i in range(self.nnearest - 1):
trgvals[isnan] = vals[self.ix[:, i + 1]][isnan]
dists[nanidx] = self.dists[..., i + 1][nanidx]
isnan = np.isnan(trgvals)
nanidx = np.argwhere(isnan)[..., 0]
if not np.count_nonzero(isnan):
break
if maxdist is None:
return trgvals
else:
return np.where(dists > maxdist, np.nan, trgvals)
[docs]
class Idw(IpolBase):
"""
Idw(src, trg, nnearest=4, p=2.)
Inverse distance weighting interpolation in N dimensions.
Parameters
----------
src : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims) of cKDTree object
Data point coordinates of the source points.
trg : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the target points.
nnearest : int
max. number of neighbours to be considered
p : float
inverse distance power used in 1/dist**p
remove_missing : bool
If True masks NaN values in the data values, defaults to False
Keyword Arguments
-----------------
**kwargs : dict
keyword arguments of ipclass (see class documentation)
Examples
--------
See :ref:`/notebooks/interpolation/interpolation.ipynb`.
Note
----
Uses :class:`scipy:scipy.spatial.cKDTree`
"""
def __init__(self, src, trg, *, nnearest=4, p=2.0, remove_missing=False, **kwargs):
if isinstance(src, spatial.cKDTree):
self.tree = src
else:
src = self._make_coord_arrays(src)
if len(src) == 0:
raise MissingSourcesError
# plant a tree, use unbalanced tree as default
kwargs.update(balanced_tree=kwargs.pop("balanced_tree", False))
self.tree = spatial.cKDTree(src, **kwargs)
self.numsources = self.tree.n
trg = self._make_coord_arrays(trg)
self.numtargets = len(trg)
if self.numtargets == 0:
raise MissingTargetsError
if nnearest > self.numsources:
util.warn(
"wradlib.ipol.Idw: `nnearest` is larger than number of "
f"source points and is set to {self.numsources} corresponding to the "
"number of source points.",
UserWarning,
)
self.nnearest = self.numsources
else:
self.nnearest = nnearest
self.remove_missing = remove_missing
self.p = p
# query tree
# scipy kwarg changed from version 1.6
if Version(scipy.__version__) < Version("1.6"):
query_kwargs = dict(n_jobs=-1)
else:
query_kwargs = dict(workers=-1)
self.dists, self.ix = self.tree.query(trg, k=self.nnearest, **query_kwargs)
# avoid bug, if there is only one neighbor at all
if self.dists.ndim == 1:
self.dists = self.dists[:, np.newaxis]
self.ix = self.ix[:, np.newaxis]
[docs]
def __call__(self, vals, *, maxdist=None):
"""
Evaluate interpolator for values given at the source points.
You can interpolate multiple datasets of source values (``vals``) at
once: the ``vals`` array should have the shape (number of source
points, number of source datasets). If you want to interpolate only one
set of source values, ``vals`` can have the shape (number of source
points, 1) or just (number of source points, ) - which is a flat/1-D
array. The output will have the same number of dimensions as ``vals``,
i.e. it will be a flat 1-D array in case ``vals`` is a 1-D array.
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
ndarray of float, shape (numsourcepoints, ...)
Values at the source points which to interpolate
maxdist : float
the maximum distance up to which points will be included into the
interpolation calculation
Returns
-------
output : :class:`numpy:numpy.ndarray`
ndarray of float with shape (numtargetpoints,...)
"""
self._check_shape(vals)
weights = 1.0 / self.dists**self.p
# if maxdist isn't given, take the maximum distance
if maxdist is not None:
outside = self.dists > maxdist
weights[outside] = 0
# take care of point coincidence
weights[np.isposinf(weights)] = 1e12
# shape handling (time, ensemble etc)
wshape = weights.shape
weights.shape = wshape + ((vals.ndim - 1) * (1,))
# expand vals to trg grid
trgvals = vals[self.ix]
# nan handling
if self.remove_missing:
isnan = np.isnan(trgvals)
weights = np.broadcast_to(weights, isnan.shape)
masked_weights = np.ma.array(weights, mask=isnan)
interpol = np.nansum(weights * trgvals, axis=1) / np.sum(
masked_weights, axis=1
)
else:
interpol = np.sum(weights * trgvals, axis=1) / np.sum(weights, axis=1)
return interpol
[docs]
class Linear(IpolBase):
"""
Interface to the :class:`scipy:scipy.interpolate.LinearNDInterpolator`
class.
We provide this class in order to achieve a uniform interface for all
Interpolator classes
Parameters
----------
src : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the source points.
trg : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the target points.
Examples
--------
See :ref:`/notebooks/interpolation/interpolation.ipynb`.
"""
def __init__(self, src, trg, *, remove_missing=False):
self.src = self._make_coord_arrays(src)
self.trg = self._make_coord_arrays(trg)
self.remove_missing = remove_missing
# remember some things
self.numtargets = len(trg)
if self.numtargets == 0:
raise MissingTargetsError
self.numsources = len(src)
if self.numsources == 0:
raise MissingSourcesError
[docs]
def __call__(self, vals, *, fill_value=np.nan):
"""
Evaluate interpolator for values given at the source points.
You can interpolate multiple datasets of source values (``vals``) at
once: the ``vals`` array should have the shape (number of source
points, number of source datasets). If you want to interpolate only one
set of source values, ``vals`` can have the shape (number of source
points, 1) or just (number of source points, ) - which is a flat/1-D
array. The output will have the same number of dimensions as ``vals``,
i.e. it will be a flat 1-D array in case ``vals`` is a 1-D array.
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
ndarray of float, shape (numsourcepoints, ...)
Values at the source points which to interpolate
fill_value : float
is needed if linear interpolation fails; defaults to np.nan
Returns
-------
output : :class:`numpy:numpy.ndarray`
ndarray of float with shape (numtargetpoints,...)
"""
self._check_shape(vals)
isnan = np.isnan(vals)
if self.remove_missing & np.count_nonzero(isnan):
ip = sinterp.LinearNDInterpolator(
self.src[~isnan, ...], vals[~isnan], fill_value=fill_value
)
else:
ip = sinterp.LinearNDInterpolator(self.src, vals, fill_value=fill_value)
return ip(self.trg)
class RectGridBase:
"""Rectangular Grid - base class
Parameters
----------
grid : :class:`numpy:numpy.ndarray` of floats
3d array of shape (..., 2)
The points defining the regular grid in n dimensions.
points : :class:`numpy:numpy.ndarray` of floats
Array of shape (..., 2)
The sample point coordinates.
"""
def __init__(self, grid, points):
self._upper = None
self._xdim = None
self._ydim = None
self._image = None
self._upper = None
self._is_grid = None
self._ipol_grid = None
self._ipol_points = None
self._grid = np.array(grid)
self._points = np.array(points)
@property
def is_grid(self):
if self._is_grid is None:
self._is_grid = self.grid.ndim == 3 and self.grid.shape[2] == 2
if not self._is_grid:
raise ValueError(
f"Grid Shape mismatch, expected (N, M, 2), but got {self.grid.shape}."
)
return self._is_grid
@property
def ydim(self):
if self._ydim is None:
self._ydim = 0 if self.image else 1
return self._ydim
@property
def xdim(self):
if self._xdim is None:
self._xdim = 1 if self.image else 0
return self._xdim
@property
def image(self):
if self._image is None:
self._image = self.grid[0, 0, 1] == self.grid[0, 1, 1]
return self._image
@property
def upper(self):
if self._upper is None:
self._upper = (
np.diff(np.take(self.grid[..., 1], 0, axis=self.xdim)[0:2])[0] < 0
)
return self._upper
@property
def grid(self):
return self._grid
@property
def points(self):
return self._points
@property
def ipol_grid(self):
if self._ipol_grid is None:
self._ipol_grid = self._get_grid_dims()
return self._ipol_grid
@property
def ipol_points(self):
if self._ipol_points is None:
self._ipol_points = self._get_points()
return self._ipol_points
def _get_grid_dims(self):
grd = self.grid
if self.image:
grd = np.flip(grd, -1)
if self.upper:
grd = np.flip(grd, self.ydim)
grd_dim0 = np.take(grd[..., 0], 0, axis=1)
grd_dim1 = np.take(grd[..., 1], 0, axis=0)
return grd_dim0, grd_dim1
def _get_points(self):
pts = self.points
if self.image:
pts = np.flip(pts, -1)
return pts.reshape((-1, 2))
[docs]
class RectGrid(RectGridBase):
"""Interpolation on a 2d grid in arbitrary dimensions.
The source data must be defined on a regular grid, the grid spacing
however may be uneven. Linear, nearest-neighbour and spline
interpolation are supported.
Based on :py:func:`scipy:scipy.interpolate.interpn`, uses:
- `nearest` :py:class:`scipy:scipy.interpolate.RegularGridInterpolator`
- `linear` :py:class:`scipy:scipy.interpolate.RegularGridInterpolator`
- `splinef2d` :py:class:`scipy.interpolate.RectBivariateSpline`
Parameters
----------
src : :class:`numpy:numpy.ndarray`
3d array of shape (..., 2)
The points defining the regular grid in n dimensions.
trg : :class:`numpy:numpy.ndarray`
Array of shape (..., ndim)
The coordinates to sample the gridded data at
Keyword Arguments
-----------------
method : str
Method of interpolation used, defaults to 'linear'.
"""
def __init__(self, src, trg, *, method="linear"):
super().__init__(src, trg)
self.method = method
[docs]
def __call__(self, values, **kwargs):
"""Evaluate interpolator for values given at the source points.
Parameters
----------
values : :class:`numpy:numpy.ndarray`
Values at the source points which to interpolate, shape (num src pts, ...)
Returns
-------
result : :class:`numpy:numpy.ndarray`
Target values with shape (num trg pts, ...)
"""
# override bounds_error
kwargs["bounds_error"] = kwargs.pop("bounds_error", False)
kwargs["method"] = kwargs.pop("method", self.method)
# need to flip ydim if grid origin is 'upper'
if self.upper:
values = np.flip(values, self.ydim)
result = sinterp.interpn(self.ipol_grid, values, self.ipol_points, **kwargs)
return result.reshape(self.points.shape[:-1])
[docs]
class RectBin(RectGridBase):
"""Bin points values to regular grid cells
Based on :py:func:`scipy:scipy.stats.binned_statistic_dd`
Parameters
----------
src : :class:`numpy:numpy.ndarray`
data point coordinates of the source points with shape (..., ndims).
trg : :class:`numpy:numpy.ndarray`
rectangular grid coordinates (center) with shape (..., 2)
"""
def __init__(self, src, trg):
super().__init__(trg, src)
self._binned_stats = False
@property
def binned_stats(self):
return self._binned_stats
@binned_stats.setter
def binned_stats(self, value):
self._binned_stats = value
def _get_grid_dims(self):
dims = super()._get_grid_dims()
return [util.center_to_edge(x) for x in dims]
[docs]
def __call__(self, values, **kwargs):
"""Evaluate interpolator for values given at the source points.
Parameters
----------
values : :class:`numpy:numpy.ndarray`
Values at the source points which to interpolate, shape (num src pts, ...)
kwargs : dict
keyword arguments passed to scipy.stats.binned_statistic_dd
Returns
-------
stat : :class:`numpy:numpy.ndarray`
Target values with shape (num trg pts, ...)
"""
# reshape into flat array
values = values.reshape(-1)
if not self.binned_stats or Version(scipy.__version__) < Version("1.4"):
result = stats.binned_statistic_dd(
self.ipol_points, values, bins=self.ipol_grid, **kwargs
)
self.binned_stats = result
else:
result = stats.binned_statistic_dd(
self.ipol_points,
values,
binned_statistic_result=self.binned_stats,
**kwargs,
)
stat = result.statistic
# need to flip ydim if grid origin is 'upper'
if self.upper:
stat = np.flip(stat, self.ydim)
return stat
class PolyArea:
"""Map values representing polygons to another polygons
Based on :mod:`wradlib.zonalstats`
Parameters
----------
src : :class:`numpy:numpy.ndarray`
Source grid edge coordinates with shape (..., 5, 2).
trg : :class:`numpy:numpy.ndarray`
Target grid edge coordinates with shape (..., 5, 2).
"""
def __init__(self, src, trg, **kwargs):
self.shape = trg.shape[:-2]
src = src.reshape((-1, 5, 2))
trg = trg.reshape((-1, 5, 2))
zd = zonalstats.ZonalDataPoly(src, trg=trg, **kwargs)
self.obj = zonalstats.ZonalStatsPoly(zd)
def __call__(self, values):
"""Evaluate interpolator for values given at the source points.
Parameters
----------
values : :class:`numpy:numpy.ndarray`
Values representing the source cells, shape corresponding to src
Returns
-------
result : :class:`numpy:numpy.ndarray`
Values representing the target cells, shape corresponding to trg
"""
values = values.ravel()
result = self.obj.mean(values)
return result.reshape(self.shape)
[docs]
class QuadriArea(PolyArea):
"""Map values representing quadrilateral grid cells to another quadrilateral grid.
Based on :mod:`wradlib.zonalstats`
Parameters
----------
src : :class:`numpy:numpy.ndarray`
Source grid edge coordinates with shape (n+1, m+1, 2).
trg : :class:`numpy:numpy.ndarray`
Target grid edge coordinates with shape (o+1, p+1, 2).
"""
def __init__(self, src, trg, **kwargs):
src = georef.rect.grid_to_polyvert(src)
trg = georef.rect.grid_to_polyvert(trg)
super().__init__(src, trg, **kwargs)
class IpolChain(IpolBase):
"""Apply successive interpolation methods.
Parameters
----------
interpolators: list
list of interpolators (IpolBase) to apply successivly
"""
def __init__(self, interpolators):
self.interpolators = interpolators
def __call__(self, values, **kwargs):
"""Evaluate interpolator for values given at the source points.
Parameters
----------
values : :class:`numpy:numpy.ndarray`
Values at src points which to interpolate with shape (num src pts, ...)
Returns
-------
result : class:`numpy:numpy.ndarray`
Values at the trg points with shape (num trg pts, ...)
"""
first = self.interpolators.pop(0)
result = first(values, **kwargs)
for interpolator in self.interpolators:
temp = interpolator(values, **kwargs)
bad = np.isnan(result)
result[bad] = temp[bad]
return result
# -----------------------------------------------------------------------------
# Covariance routines needed for Kriging
# -----------------------------------------------------------------------------
def parse_covariogram(cov_model):
""" """
patterns = [
re.compile(r"([\d\.]+) Nug\(([\d\.]+)\)"), # nugget
re.compile(r"([\d\.]+) Lin\(([\d\.]+)\)"), # linear
re.compile(r"([\d\.]+) Sph\(([\d\.]+)\)"), # spherical
re.compile(r"([\d\.]+) Exp\(([\d\.]+)\)"), # exponential
re.compile(r"([\d\.]+) Gau\(([\d\.]+)\)"), # gaussian
re.compile(r"([\d\.]+) Mat\(([\d\.]+)\)\^([\d\.]+)"), # matern
re.compile(r"([\d\.]+) Pow\(([\d\.]+)\)"), # power
# cauchy
re.compile(r"([\d\.]+) " r"Cau\(([\d\.]+)\)\^([\d\.]+)\^([\d\.]+)"),
]
cov_funs = [
cov_nug,
cov_lin,
cov_sph,
cov_exp,
cov_gau,
cov_mat,
cov_pow,
cov_cau,
]
funcs = []
# first split along '+'
subparts = cov_model.split("+")
# then analyse subparts
for _i, subpart in enumerate(subparts):
# iterate over all available patterns
for j, pattern in enumerate(patterns):
m = pattern.search(subpart)
if m:
params = [float(p) for p in m.groups()]
funcs.append(_make_cov(cov_funs[j], params))
# return complete covariance function, which adds
# individual subparts
return lambda h: reduce(np.add, [f(h) for f in funcs])
def _make_cov(func, params):
return lambda h: func(h, *params)
def cov_nug(h, sill, rng):
r"""nugget covariance function
:math:`\gamma(h) = s ` for :math:`h \leq r`, 0 otherwise
Therefore, usually rng is set to 0
"""
h = np.asanyarray(h)
return np.where(h <= rng, sill, 0.0)
def cov_exp(h, sill=1.0, rng=1.0):
"""exponential type covariance function"""
h = np.asanyarray(h)
return sill * (np.exp(-h / rng))
def cov_sph(h, sill=1.0, rng=1.0):
"""spherical type covariance function"""
h = np.asanyarray(h)
return np.where(h < rng, sill * (1.0 - 1.5 * h / rng + h**3 / (2 * rng**3)), 0.0)
def cov_gau(h, sill=1.0, rng=1.0):
"""gaussian type covariance function"""
h = np.asanyarray(h)
return sill * np.exp(-(h**2) / rng**2)
def cov_lin(h, sill=1.0, rng=1.0):
"""linear covariance function"""
h = np.asanyarray(h)
return np.where(h < rng, sill * (-h / rng + 1.0), 0.0)
def cov_mat(h, sill=1.0, rng=1.0, shp=0.5):
"""matern covariance function"""
"""Matern Covariance Function Family:
shp = 0.5 --> Exponential Model
shp = inf --> Gaussian Model
"""
h = np.asanyarray(h)
# for v > 100 shit happens --> use Gaussian model
if shp > 100:
c = cov_gau(h, sill, rng)
else:
# modified bessel function of second kind of order v
kv = special.kv
# Gamma function
tau = special.gamma
fac1 = h / rng * 2.0 * np.sqrt(shp)
fac2 = tau(shp) * 2.0 ** (shp - 1.0)
c = np.where(h != 0, sill * 1.0 / fac2 * fac1**shp * kv(shp, fac1), sill)
return c
def cov_pow(h, sill=1.0, rng=1.0):
"""power law covariance function"""
h = np.asanyarray(h)
return sill - h**rng
def cov_cau(h, sill=1.0, rng=1.0, alpha=1.0, beta=1.0):
"""
cauchy covariance function.
alpha >0 & <=2 ... shape parameter
beta >0 ... parameterizes long term memory
"""
h = np.asanyarray(h).astype("float")
return sill * (1 + (h / rng) ** alpha) ** (-beta / alpha)
[docs]
class OrdinaryKriging(IpolBase):
r"""
OrdinaryKriging(src, trg, cov='1.0 Exp(10000.)', nnearest=12)
Interpolate using Ordinary Kriging
(Co-)Variogram definitions are given in the syntax that ``gstat`` uses.
It allows nesting of different basic variogram types using linear
combinations.
Each basic covariogram is usually defined by
Note that, strictly speaking, this implementation doesn't allow Kriging of
fields for which the covariance does not exist. While this is
mathematically possible, it is rather rare for fields encountered in
reality. Therefore, this should not be a severe limitation.
Most (co-)variograms are characterized by a sill parameter (which is
the (co-)variance at separation distance 0) a range parameter (which
indicates a separation distance after which the covariance drops
close to zero) a sometimes additional parameters governing the shape
of the function. In the following range is given by the variable `r` and
the sill by the variable `s`.
Currently implemented are:
- Pure Nugget
- Exponential
- Spherical
- Gaussian
- Linear
- Matern
- Power
- Cauchy
Parameters
----------
src : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the source points.
trg : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the target points.
cov : str
covariance (variogram) model string in the syntax ``gstat``
uses.
nnearest : int
max. number of neighbours to be considered
remove_missing : bool
If True masks NaN values in the data values, defaults to False
Note
----
The class calculates the Kriging weights during initialization, because
these only depend on the configuration of the points.
The call method is then only used to calculate estimated values at the
target points based on those at the source points. Therefore, the main
computational load is experienced during initialization. This behavior is
different from that of the Idw or Nearest Interpolators.
After initialization the estimation variance at each interpolation target
may be retrieved from the attribute `estimation_variance`.
Examples
--------
See :ref:`/notebooks/interpolation/interpolation.ipynb`.
"""
def __init__(
self,
src,
trg,
cov="1.0 Exp(10000.)",
*,
nnearest=12,
remove_missing=False,
**kwargs,
):
""" """
if isinstance(src, spatial.cKDTree):
self.tree = src
self.src = self.tree.data
else:
self.src = self._make_coord_arrays(src)
if len(src) == 0:
raise MissingSourcesError
# plant a tree, use unbalanced tree as default
kwargs.update(balanced_tree=kwargs.pop("balanced_tree", False))
self.tree = spatial.cKDTree(self.src, **kwargs)
self.numsources = self.tree.n
self.remove_missing = remove_missing
self.trg = self._make_coord_arrays(trg)
# remember some things
self.numtargets = len(trg)
if self.numtargets == 0:
raise MissingTargetsError
if nnearest > self.numsources:
util.warn(
"wradlib.ipol.OrdinaryKriging: `nnearest` is "
"larger than number of source points and is "
f"set to {self.numsources} corresponding to the "
"number of source points.",
)
self.nnearest = self.numsources
else:
self.nnearest = nnearest
# tree query
self.dists, self.ix = self.tree.query(trg, k=self.nnearest)
# avoid bug, if there is only one neighbor at all
if self.dists.ndim == 1:
self.dists = self.dists[:, np.newaxis]
self.ix = self.ix[:, np.newaxis]
# parse covariogram function string
self.cov_func = parse_covariogram(cov)
self.weights = []
self.estimation_variance = []
# do the kriging
self._krige()
def _krig_matrix(self, src):
"""Sets up the kriging system for a configuration of source points."""
var_matrix = self.cov_func(spatial.distance_matrix(src, src))
ok_matrix = np.ones((len(src) + 1, len(src) + 1))
ok_matrix[:-1, :-1] = var_matrix
ok_matrix[-1, -1] = 0.0
return ok_matrix
def _krig_rhs(self, dists):
"""Sets up a right hand side of the kriging system given the distances
of the target to the source points. To be used in conjunction with
`_krig_matrix`."""
rhs = self.cov_func(dists)
ok_rhs = np.concatenate([rhs, [1.0]])
return ok_rhs
def _krige(self):
"""Sets up the kriging system and solves it in order to obtain the
interpolation weights of ordinary kriging.
Also calculates the kriging estimation variance from the results"""
for dist, ix in zip(self.dists, self.ix):
matrix = self._krig_matrix(self.src[ix, :])
rhs = self._krig_rhs(dist)
weights = np.linalg.solve(matrix, rhs)
self.weights.append(weights)
self.estimation_variance.append(self.cov_func(0.0) - np.sum(weights * rhs))
[docs]
def __call__(self, vals):
"""
Evaluate interpolator for values given at the source points.
You can interpolate multiple datasets of source values (``vals``) at
once: the ``vals`` array should have the shape (number of source
points, number of source datasets). If you want to interpolate only one
set of source values, ``vals`` can have the shape (number of source
points, 1) or just (number of source points, ) - which is a flat/1-D
array. The output will have the same number of dimensions as ``vals``,
i.e. it will be a flat 1-D array in case ``vals`` is a 1-D array.
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
ndarray of float, shape (numsourcepoints, numfields)
Values at the source points from which to interpolate
Several fields may be calculated at once by passing them
along the second dimension.
Only this second dimension is implemented. You'll have to
reshape a more complex array for the function to work.
Returns
-------
output : :class:`numpy:numpy.ndarray`
ndarray of float with shape (numtargetpoints, numfields)
"""
v = self._make_2d(vals)
self._check_shape(v)
# expand vals to trg grid
trgvals = v[self.ix]
# calculate estimator
weights = np.array(self.weights)
# nan handling
if self.remove_missing:
isnan = np.isnan(trgvals)
weights = np.broadcast_to(weights[:, :-1][..., np.newaxis], isnan.shape)
masked_weights = np.ma.array(weights, mask=isnan)
interpol = np.nansum(masked_weights * trgvals, axis=1)
else:
interpol = np.sum(weights[:, :-1][..., np.newaxis] * trgvals, axis=1)
if vals.ndim == 1:
return interpol.ravel()
else:
return interpol
[docs]
class ExternalDriftKriging(IpolBase):
"""
ExternalDriftKriging(src, trg, cov='1.0 Exp(10000.)', nnearest=12,
drift_src=None, drift_trg=None)
Kriging with external drift
Parameters
----------
src : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (nsrcpoints, ndims)
Data point coordinates of the source points.
trg : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (ntrgpoints, ndims)
Data point coordinates of the target points.
cov : str
covariance (variogram) model string in the syntax ``gstat``
uses.
nnearest : int
max. number of neighbours to be considered
src_drift : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (nsrcpoints, )
values of the external drift at each source point
trg_drift : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (ntrgpoints, )
values of the external drift at each target point
See Also
--------
:class:`~wradlib.ipol.OrdinaryKriging`
Note
----
After calling the object in order to get the interpolated values,
the estimation variance of the system may be
retrieved from the attribute `estimation_variance`. Accordingly, the
interpolation weights can be retrieved from the attribute `weights`
If drift_src or drift_trg are not given on initialization, they must
be provided when using the __call__ method.
If any of them is given on initialization its values may be overridden
by passing new values to the __call__ method.
Examples
--------
See :ref:`/notebooks/interpolation/interpolation.ipynb`.
"""
def __init__(
self,
src,
trg,
cov="1.0 Exp(10000.)",
*,
nnearest=12,
src_drift=None,
trg_drift=None,
remove_missing=False,
**kwargs,
):
""" """
if isinstance(src, spatial.cKDTree):
self.tree = src
self.src = self.tree.data
else:
self.src = self._make_coord_arrays(src)
if len(src) == 0:
raise MissingSourcesError
# plant a tree, use unbalanced tree as default
kwargs.update(balanced_tree=kwargs.pop("balanced_tree", False))
self.tree = spatial.cKDTree(self.src, **kwargs)
self.numsources = self.tree.n
self.remove_missing = remove_missing
self.trg = self._make_coord_arrays(trg)
self.src_drift = src_drift
self.trg_drift = trg_drift
# remember some things
self.numtargets = len(trg)
if self.numtargets == 0:
raise MissingTargetsError
self.numsources = len(src)
if self.numsources == 0:
raise MissingSourcesError
if nnearest > self.numsources:
util.warn(
"wradlib.ipol.ExternalDriftKriging: `nnearest` is larger "
f"than number of source points and is set to {self.numsources} "
"corresponding to the number of source points.",
)
self.nnearest = self.numsources
else:
self.nnearest = nnearest
# query tree
self.dists, self.ix = self.tree.query(trg, k=self.nnearest)
# avoid bug, if there is only one neighbor at all
if self.dists.ndim == 1:
self.dists = self.dists[:, np.newaxis]
self.ix = self.ix[:, np.newaxis]
# parse covariogram function string
self.cov_func = parse_covariogram(cov)
self.weights = []
self.estimation_variance = []
def _krig_matrix(self, src, drift):
"""Sets up the kriging system for a configuration of source points."""
# the basic covariance matrix
var_matrix = self.cov_func(spatial.distance_matrix(src, src))
# the extended matrix, initialized to ones
edk_matrix = np.ones((len(src) + 2, len(src) + 2))
# adding entries for the first lagrange multiplier for the ordinary
# kriging part
edk_matrix[:-2, :-2] = var_matrix
edk_matrix[-2, -2] = 0.0
# adding entries for the second lagrange multiplier for the edk part
edk_matrix[:-2, -1] = drift
edk_matrix[-1, :-2] = drift
edk_matrix[-2:, -1] = 0.0
edk_matrix[-1, -2:] = 0.0
return edk_matrix
def _krig_rhs(self, dists, drift):
"""Sets up a right hand side of the kriging system given the distances
of the target to the source points. To be used in conjunction with
`_krig_matrix`."""
rhs = self.cov_func(dists)
edk_rhs = np.concatenate([rhs, np.array([1.0, drift])])
return edk_rhs
def _krige(self, src_drift, trg_drift):
"""Sets up the kriging system and solves it in order to obtain the
interpolation weights of ordinary kriging.
Also calculates the kriging estimation variance from the results"""
all_weights = []
estimation_variances = []
for dist, ix, td in zip(self.dists, self.ix, trg_drift):
matrix = self._krig_matrix(self.src[ix, :], src_drift[ix])
rhs = self._krig_rhs(dist, td)
try:
weights = np.linalg.solve(matrix, rhs)
except np.linalg.LinAlgError:
weights = np.repeat(np.nan, len(rhs))
all_weights.append(weights)
estimation_variances.append(self.cov_func(0.0) - np.sum(weights * rhs))
return all_weights, estimation_variances
[docs]
def __call__(self, vals, *, src_drift=None, trg_drift=None):
"""
Evaluate interpolator for values given at the source points.
You can interpolate multiple datasets of source values (``vals``) at
once: the ``vals`` array should have the shape (number of source
points, number of source datasets). If you want to interpolate only one
set of source values, ``vals`` can have the shape (number of source
points, 1) or just (number of source points, ) - which is a flat/1-D
array. The output will have the same number of dimensions as ``vals``,
i.e. it will be a flat 1-D array in case ``vals`` is a 1-D array.
Parameters
----------
vals : :class:`numpy:numpy.ndarray`
ndarray of float, shape (numsourcepoints, numfields)
Values at the source points from which to interpolate
Several fields may be calculated at once by passing them
along the second dimension.
Only this second dimension is implemented. You'll have to
reshape a more complex array for the function to work.
Returns
-------
output : :class:`numpy:numpy.ndarray`
ndarray of float with shape (numtargetpoints, numfields)
"""
if vals.ndim > 2:
raise ValueError(
f"`vals` nedd to be a 2D-array, but is of shape {vals.shape}."
)
v = self._make_2d(vals)
self._check_shape(v)
if src_drift is None:
# check if we have data from __init__
if self.src_drift is None:
raise ValueError(
"`src_drift`-kwarg must be specified either on "
"initialization or when calling the interpolator."
)
src_drift = self.src_drift
if trg_drift is None:
# check if we have data from __init__
if self.trg_drift is None:
raise ValueError(
"`trg_drift`-kwarg must be specified either on "
"initialization or when calling the interpolator."
)
trg_drift = self.trg_drift
src_d = self._make_2d(src_drift)
trg_d = self._make_2d(trg_drift)
self._check_shape(src_d)
# re-initialize weights and variances to ensure that these only reflect
# the results of the current call and not any previous call
self.weights = []
self.estimation_variance = []
# if drifts are constant, we can save time by solving the kriging
# system once
if src_d.shape[1] == 1:
wght, variances = self._krige(src_d.squeeze(), trg_d.squeeze())
self.weights = wght
self.estimation_variance = variances
weights = np.array(self.weights)
trgvals = v[self.ix]
if self.remove_missing:
isnan = np.isnan(trgvals)
weights = np.broadcast_to(weights[:, :-2][..., np.newaxis], isnan.shape)
masked_weights = np.ma.array(weights, mask=isnan)
ip = np.nansum(masked_weights * trgvals, axis=1)
else:
ip = np.nansum(weights[:, :-2][..., np.newaxis] * trgvals, axis=1)
# otherwise we need to set up and solve the kriging system for each
# field individually
else:
ip = np.empty((self.trg.shape[0], v.shape[1]))
if (v.shape[1] != src_d.shape[1]) or (v.shape[1] != trg_d.shape[1]):
raise ValueError(
f"`vals` shape[1] ({v.shape[1]}) does not match `src` "
f"({src_d.shape[1]}) and `trg` ({trg_d.shape[1]})."
)
for i in range(v.shape[1]):
wght, variances = self._krige(
src_d[:, i].squeeze(), trg_d[:, i].squeeze()
)
weights = np.array(wght)
trgvals = v[self.ix, i]
if self.remove_missing:
isnan = np.isnan(trgvals)
weights = np.broadcast_to(weights[:, :-2], isnan.shape)
masked_weights = np.ma.array(weights, mask=isnan)
ip[:, i] = np.nansum(masked_weights * trgvals, axis=1)
else:
ip[:, i] = np.nansum(weights[:, :-2] * trgvals, axis=1)
self.weights.append(weights)
self.estimation_variance.append(variances)
if vals.ndim == 1:
return ip.ravel()
else:
return ip
# -----------------------------------------------------------------------------
# Wrapper functions
# -----------------------------------------------------------------------------
[docs]
def interpolate(src, trg, vals, ipclass, *args, **kwargs):
"""
Convenience function to use the interpolation classes in an efficient way
The interpolation classes in :mod:`wradlib.ipol` are computationally very
efficient if they are applied on large multidimensional arrays of which
the first dimension must be the locations' dimension (1d or 2d coordinates)
and the following dimensions can be anything (e.g. time or ensembles). This
way, the weights need to be computed only once. However, this can only be
done with success if all source values for the interpolation are valid
numbers. If the source values contain let's say *np.nan* types, the result
of the interpolation will be *np.nan* in the vicinity of the corresponding
points, too. Imagine that you have a time series of observations at points
and in each time step one observation is missing.
You would still like to efficiently apply the interpolation
classes, but you will need to account for the resulting *np.nan* values in
the interpolation output.
In order to still allow for the efficient application, you have to take
care of the remaining np.nan in your interpolation result. This is done by
this convenience function.
Alternatively, you have to make sure that your *vals* argument does not
contain any *np.nan* values OR you have to post-process missing values in
your interpolation result in another way.
Warning
-------
Works only for one- and two-dimensional *vals* arrays, yet.
Parameters
----------
src : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the source points.
trg : :class:`numpy:numpy.ndarray`
ndarray of floats, shape (npoints, ndims)
Data point coordinates of the target points.
vals : :class:`numpy:numpy.ndarray`
ndarray of float, shape (numsourcepoints, ...)
Values at the source points which to interpolate
ipclass : :class:`wradlib.ipol.IpolBase`
A class which inherits from IpolBase.
Other Parameters
----------------
*args : list
arguments of ipclass (see class documentation)
Keyword Arguments
-----------------
**kwargs : dict
keyword arguments of ipclass (see class documentation)
Examples
--------
>>> # test for 1 dimension in space and two value dimensions
>>> src = np.arange(10)[:,None]
>>> trg = np.linspace(0,20,40)[:,None]
>>> vals = np.hstack((np.sin(src), 10.+np.sin(src)))
>>> # here we introduce missing values only in the second dimension
>>> vals[3:5,1] = np.nan
>>> ipol_result = interpolate(src, trg, vals, Idw, nnearest=2)
>>> import matplotlib.pyplot as plt
>>> plt.interactive(True)
>>> line1 = plt.plot(trg, ipol_result, 'b+')
>>> line2 = plt.plot(src, vals, 'ro')
"""
if vals.ndim == 1:
# source values are one dimensional, we have just
# to remove invalid data
ix_valid = np.where(np.isfinite(vals))[0]
ip = ipclass(src[ix_valid], trg, *args, **kwargs)
result = ip(vals[ix_valid])
elif vals.ndim == 2:
# this implementation for 2 dimensions needs generalization
ip = ipclass(src, trg, *args, **kwargs)
result = ip(vals)
nan_in_result = np.where(np.isnan(result))
# nan_in_vals = np.where(np.isnan(vals))
for i in np.unique(nan_in_result[-1]):
ix_good = np.where(np.isfinite(vals[..., i]))[0]
tmp = np.where(nan_in_result[-1] == i)[0]
ix_broken_targets = nan_in_result[0][tmp]
ip = ipclass(
src[ix_good],
trg[nan_in_result[0][np.where(nan_in_result[-1] == i)[0]]],
*args,
**kwargs,
)
tmp = ip(vals[ix_good, i].reshape((len(ix_good), -1)))
result[ix_broken_targets, i] = tmp.ravel()
else:
if np.any(np.isnan(vals.ravel())):
raise NotImplementedError(
"At the moment, `interpolate` can only deal with NaN values in `vals` "
"if `vals` has less than 3 dimension."
)
else:
# if no NaN value are in <vals> we can safely apply the
# ipclass as is
ip = ipclass(src, trg, *args, **kwargs)
result = ip(vals)
return result
[docs]
@singledispatch
def interpolate_polar(data, *, mask=None, ipclass=Nearest):
"""
Convenience function to interpolate polar data
Parameters
----------
data : :class:`numpy:numpy.ndarray`
2-dimensional array (azimuth, ranges) of floats;
if no mask is assigned explicitly polar data should be a masked array
mask : :class:`numpy:numpy.ndarray`, optional
boolean array with pixels to be interpolated set to True,
must have the same shape as data, defaults to None.
ipclass : :class:`wradlib.ipol.IpolBase`
A class which inherits from IpolBase, defaults to :class:`wradlib.ipol.Nearest`.
Returns
-------
filled_data : :class:`numpy:numpy.ndarray`
2D array with interpolated values for the values set to True in the mask
Examples
--------
>>> import numpy as np # noqa
>>> import wradlib as wrl
>>> # creating a data array and mask some values
>>> data = np.arange(12.).reshape(4,3)
>>> masked_values = (data==2) | (data==9)
>>> # interpolate the masked data based on ''masked_values''
>>> filled_a = wrl.ipol.interpolate_polar(data, mask = masked_values, ipclass = wrl.ipol.Linear) # noqa
>>> da = wrl.georef.create_xarray_dataarray(filled_a)
>>> da = da.wrl.georef.georeference()
>>> pm = wrl.vis.plot(da)
>>> # the same result can be achieved by using a masked array instead of an explicit mask # noqa
>>> mdata = np.ma.array(data, mask = masked_values)
>>> filled_b = wrl.ipol.interpolate_polar(mdata, ipclass = wrl.ipol.Linear) # noqa
>>> da = wrl.georef.create_xarray_dataarray(filled_b)
>>> da = da.wrl.georef.georeference()
>>> pm = wrl.vis.plot(da)
"""
if mask is None:
# no mask assigned: try to get it from masked array
if not isinstance(data, np.ma.core.MaskedArray):
util.warn(
"Neither an explicit mask is assigned nor the data-array is masked."
)
mask = np.ma.getmaskarray(data)
elif not np.any(mask):
# mask contains no True values, so there is nothing to fill
return data
clutter_indices = np.where(mask.ravel())
# construct the ranges for every bin
ranges = np.tile(np.arange(0.5, data.shape[1] + 0.5), data.shape[0])
# construct the angles for every bin
angles = np.repeat(
np.radians(np.linspace(0, 360, endpoint=False, num=data.shape[0])),
data.shape[1],
)
# calculate cartesian coordinates for every bin
binx = np.cos(angles) * ranges
biny = np.sin(angles) * ranges
# calculate cartesian coordinates for bins, which are not masked
src_coord = np.array(
[(np.delete(binx, clutter_indices)), (np.delete(biny, clutter_indices))]
).transpose()
# calculate cartesian coordinates for bins, which are masked
trg_coord = np.array([binx[clutter_indices], biny[clutter_indices]]).transpose()
# data values for bins, which are not masked
values_list = np.delete(data, clutter_indices)
filled_data = data.copy().ravel()
# interpolate masked bins
filling = interpolate(src_coord, trg_coord, values_list, ipclass)
# fill data with the interpolations
filled_data[clutter_indices] = filling.astype(filled_data.dtype)
# in case of nans as processed at the rim when interpolating linear,
# these values are finally interpolated by nearest Neighbor interpolation
if np.any(np.isnan(filled_data)):
trg_coord = np.array(
[
binx[np.where(np.isnan(filled_data))],
biny[np.where(np.isnan(filled_data))],
]
).transpose()
filling = interpolate(src_coord, trg_coord, values_list, ipclass=Nearest)
filled_data[np.where(np.isnan(filled_data))] = filling
return filled_data.reshape(data.shape[0], data.shape[1])
@interpolate_polar.register(xr.DataArray)
def _interpolate_polar_xarray(obj, mask, **kwargs):
dim0 = obj.wrl.util.dim0()
def wrapper(obj, mask, *args, **kwargs):
kwargs.setdefault("mask", mask)
return interpolate_polar(obj, *args, **kwargs)
out = xr.apply_ufunc(
wrapper,
obj,
mask,
input_core_dims=[[dim0, "range"], [dim0, "range"]],
output_core_dims=[[dim0, "range"]],
dask="parallelized",
vectorize=True,
kwargs=kwargs,
dask_gufunc_kwargs=dict(allow_rechunk=True),
)
out.name = "interpolate_polar"
return out
[docs]
def cart_to_irregular_interp(cartgrid, values, newgrid, **kwargs):
"""
Interpolate array ``values`` defined by cartesian coordinate array
``cartgrid`` to new coordinates defined by ``newgrid`` using
nearest neighbour, linear or cubic interpolation
Slow for large arrays
Keyword arguments are fed to :func:`scipy:scipy.interpolate.griddata`
Parameters
----------
cartgrid : :class:`numpy:numpy.ndarray`
3-dimensional array (nx, ny, lon/lat) of floats;
values : :class:`numpy:numpy.ndarray`
2-dimensional array (nx, ny) of data values
newgrid : :class:`numpy:numpy.ndarray`
Nx2-dimensional array (..., lon/lat) of floats
kwargs : :func:`scipy:scipy.interpolate.griddata`
Returns
-------
interp : :class:`numpy:numpy.ndarray`
array with interpolated values of size N
"""
# TODO: dimension checking
newshape = newgrid.shape[:-1]
cart_arr = cartgrid.reshape(-1, cartgrid.shape[-1])
new_arr = newgrid.reshape(-1, newgrid.shape[-1])
if values.ndim > 1:
values = values.ravel()
interp = sinterp.griddata(cart_arr, values, new_arr, **kwargs)
interp = interp.reshape(newshape)
return interp
[docs]
def cart_to_irregular_spline(cartgrid, values, newgrid, **kwargs):
"""
Map array ``values`` defined by cartesian coordinate array ``cartgrid``
to new coordinates defined by ``newgrid`` using spline interpolation.
Keyword arguments are fed through to
:func:`scipy:scipy.ndimage.map_coordinates`
Parameters
----------
cartgrid : :class:`numpy:numpy.ndarray`
3-dimensional array (nx, ny, lon/lat) of floats
values : :class:`numpy:numpy.ndarray`
2-dimensional array (nx, ny) of data values
newgrid : :class:`numpy:numpy.ndarray`
Nx2-dimensional array (..., lon/lat) of floats
kwargs : :func:`scipy:scipy.ndimage.map_coordinates`
Returns
-------
interp : :class:`numpy:numpy.ndarray`
array with interpolated values of size N
Examples
--------
See :ref:`/notebooks/beamblockage/beamblockage.ipynb#\
Preprocessing-the-digitial-elevation-model`.
"""
# TODO: dimension checking
newshape = newgrid.shape[:-1]
xi = newgrid[..., 0].ravel()
yi = newgrid[..., 1].ravel()
nx = cartgrid.shape[1]
ny = cartgrid.shape[0]
cxmin = np.min(cartgrid[..., 0])
cxmax = np.max(cartgrid[..., 0])
cymin = np.min(cartgrid[..., 1])
cymax = np.max(cartgrid[..., 1])
# this functionality finds the floating point
# indices into the value array (0:nx-1)
# can be transferred into separate function
# if necessary
xi = (nx - 1) * (xi - cxmin) / (cxmax - cxmin)
# check origin to calculate y index
if util.get_raster_origin(cartgrid) == "lower":
yi = (ny - 1) * (yi - cymin) / (cymax - cymin)
else:
yi = ny - (ny - 1) * (yi - cymin) / (cymax - cymin)
# interpolation by map_coordinates
interp = ndimage.map_coordinates(values, [yi, xi], **kwargs)
interp = interp.reshape(newshape)
return interp
[docs]
class IpolMethods(XarrayMethods):
"""wradlib xarray SubAccessor methods for Ipol Methods."""
[docs]
@docstring(interpolate_polar)
def interpolate_polar(self, *args, **kwargs):
if not isinstance(self, IpolMethods):
return interpolate_polar(self, *args, **kwargs)
else:
return interpolate_polar(self._obj, *args, **kwargs)
if __name__ == "__main__":
print("wradlib: Calling module <ipol> as main...")