Source code for wradlib.io.gdal

#!/usr/bin/env python
# Copyright (c) 2011-2023, wradlib developers.
# Distributed under the MIT License. See LICENSE.txt for more info.

"""
GDAL Raster/Vector Data I/O
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autosummary::
   :nosignatures:
   :toctree: generated/

   {}
"""
__all__ = [
    "open_vector",
    "open_raster",
    "read_safnwc",
    "gdal_create_dataset",
    "write_raster_dataset",
    "VectorSource",
]
__doc__ = __doc__.format("\n   ".join(__all__))

import os
import tempfile

import numpy as np

from wradlib import georef
from wradlib.util import import_optional

osr = import_optional("osgeo.osr")
ogr = import_optional("osgeo.ogr")
gdal = import_optional("osgeo.gdal")

# check windows
isWindows = os.name == "nt"


[docs] def open_vector(filename, *, driver=None, layer=0): """Open vector file, return gdal.Dataset and OGR.Layer .. warning:: dataset and layer have to live in the same context, if dataset is deleted all layer references will get lost Parameters ---------- filename : str vector file name driver : str gdal driver string layer : int or str Returns ------- dataset : :py:class:`gdal:osgeo.gdal.Dataset` gdal.Dataset layer : :py:class:`gdal:osgeo.ogr.Layer` ogr.Layer """ dataset = gdal.OpenEx(filename) if driver: gdal.GetDriverByName(driver) layer = dataset.GetLayer(layer) return dataset, layer
[docs] def open_raster(filename, *, driver=None): """Open raster file, return gdal.Dataset Parameters ---------- filename : str raster file name driver : str gdal driver string Returns ------- dataset : :py:class:`gdal:osgeo.gdal.Dataset` gdal.Dataset """ dataset = gdal.OpenEx(filename) if driver: gdal.GetDriverByName(driver) return dataset
[docs] def read_safnwc(filename): """Read MSG SAFNWC hdf5 file into a gdal georeferenced object Parameters ---------- filename : str satellite file name Returns ------- ds : :py:class:`gdal:osgeo.gdal.Dataset` gdal.DataSet with satellite data """ root = gdal.Open(filename) ds1 = gdal.Open("HDF5:" + filename + "://CT") ds = gdal.GetDriverByName("MEM").CreateCopy("out", ds1, 0) try: crs = osr.SpatialReference() crs.ImportFromProj4(ds.GetMetadata()["PROJECTION"]) except KeyError as err: raise OSError(f"Projection is missing for satellite file {filename}") from err geotransform = root.GetMetadata()["GEOTRANSFORM_GDAL_TABLE"].split(",") geotransform[0] = root.GetMetadata()["XGEO_UP_LEFT"] geotransform[3] = root.GetMetadata()["YGEO_UP_LEFT"] ds.SetProjection(crs.ExportToWkt()) ds.SetGeoTransform([float(x) for x in geotransform]) return ds
[docs] def gdal_create_dataset( drv, name, cols=0, rows=0, bands=0, *, gdal_type=None, remove=False ): """Creates GDAL.DataSet object. Parameters ---------- drv : str GDAL driver string name : str path to filename cols : int number of columns rows : int number of rows bands : int number of raster bands gdal_type : :py:class:`gdal:osgeo.ogr.DataType` raster data type, e.g. gdal.GDT_Float32 remove : bool if True, existing gdal.Dataset will be removed before creation Returns ------- out : :py:class:`gdal:osgeo.gdal.Dataset` gdal.Dataset """ if gdal_type is None: gdal_type = gdal.GDT_Unknown driver = gdal.GetDriverByName(drv) metadata = driver.GetMetadata() if not metadata.get("DCAP_CREATE", False): raise TypeError(f"Driver {drv} doesn't support Create() method.") if remove: if os.path.exists(name): driver.Delete(name) ds = driver.Create(name, cols, rows, bands, gdal_type) return ds
[docs] def write_raster_dataset(fpath, dataset, *, driver="GTiff", **kwargs): """Write raster dataset to file format Parameters ---------- fpath : str A file path - should have file extension corresponding to format. dataset : :py:class:`gdal:osgeo.gdal.Dataset` gdal.Dataset gdal raster dataset driver : str, optional gdal raster format driver string, defaults to "GTiff" Keyword Arguments ----------------- options : list, optional Option strings for the corresponding format. Defaults to remove : bool, optional if True, existing gdal.Dataset will be removed before creation, defaults to False Note ---- For format and options refer to `formats_list <https://gdal.org/formats_list.html>`_. Examples -------- See :ref:`/notebooks/fileio/gis/raster_data.ipynb`. """ # get option list options = kwargs.get("options", []) remove = kwargs.get("remove", False) driver = gdal.GetDriverByName(driver) metadata = driver.GetMetadata() # check driver capability if not ("DCAP_CREATECOPY" in metadata and metadata["DCAP_CREATECOPY"] == "YES"): raise TypeError(f"Raster Driver {driver} doesn't support CreateCopy() method.") if remove: if os.path.exists(fpath): driver.Delete(fpath) target = driver.CreateCopy(fpath, dataset, 0, options) del target
[docs] class VectorSource: """DataSource class for handling ogr/gdal vector data DataSource handles creates in-memory (vector) ogr DataSource object with one layer for point or polygon geometries. Parameters ---------- data : sequence or str sequence of source points (shape Nx2) or polygons (shape NxMx2) or Vector File (GDAL/OGR) filename containing source points/polygons trg_crs : :py:class:`gdal:osgeo.osr.SpatialReference` GDAL OSR SRS describing target CRS the source data should be projected to Keyword Arguments ----------------- name : str Layer Name, defaults to "layer". source : int Number of layer to load, if multiple layers in source shape file. mode : str Return type of class access functions/properties. Can be either of "numpy", "geo" and "ogr", defaults to "numpy". src_crs : :py:class:`gdal:osgeo.osr.SpatialReference` GDAL OGR SRS describing projection source in which data is provided in. Warning ------- Writing shapefiles with the wrong locale settings can have impact on the type of the decimal. If problem arise use ``LC_NUMERIC=C`` in your environment. Examples -------- See :ref:`/notebooks/fileio/gis/vector_data.ipynb`. """ def __init__(self, data=None, trg_crs=None, name="layer", source=0, **kwargs): self._trg_crs = trg_crs self._name = name self._geo = None self._mode = kwargs.get("mode", "numpy") self._src_crs = kwargs.get("src_crs", None) if data is not None: if isinstance(data, (np.ndarray, list)): self._ds = self._check_src(data) else: self.load_vector(data, source=source) self._create_spatial_index() else: self._ds = None
[docs] def close(self): if self._geo is not None: self._geo = None if self.ds is not None: fname = self.ds.GetDescription() driver = self.ds.GetDriver() self.ds = None driver.Delete(fname)
__del__ = close def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def __iter__(self): """Return Layer Feature Iterator.""" if self._mode == "ogr": lyr = self.ds.GetLayer() return iter(lyr) elif self._mode == "geo": return self.geo.iterrows() else: lyr = self.ds.GetLayer() def _get_geom(feat): return georef.ogr_to_numpy(feat.GetGeometryRef()) return iter(map(_get_geom, lyr)) def __len__(self): lyr = self.ds.GetLayer() return lyr.GetFeatureCount() def __repr__(self): lyr = self.ds.GetLayer() summary = [f"<wradlib.{type(self).__name__}>"] geom_type = f"Type: {ogr.GeometryTypeToName(lyr.GetGeomType())}" summary.append(geom_type) geoms = f"Geometries: {len(self)}" summary.append(geoms) return "\n".join(summary) @property def mode(self): return self._mode @mode.setter def mode(self, value): self._mode = value @property def ds(self): """Returns VectorSource""" self._check_ds() return self._ds @ds.setter def ds(self, value): self._ds = value def _check_ds(self): """Raise ValueError if empty VectorSource""" if self._ds is None: raise ValueError("Trying to access empty VectorSource.") @property def extent(self): return self.ds.GetLayer().GetExtent() @property def crs(self): return self.ds.GetLayer().GetSpatialRef() @property def data(self): """Returns VectorSource geometries as numpy arrays Note ---- This may be slow, because it extracts all source polygons """ lyr = self.ds.GetLayer() lyr.ResetReading() lyr.SetSpatialFilter(None) lyr.SetAttributeFilter(None) return self._get_data() @property def geo(self): """Returns VectorSource geometries as GeoPandas Dataframe""" self._check_ds() if self._geo is None: geopandas = import_optional("geopandas") self._geo = geopandas.read_file(self.ds.GetDescription()) return self._geo def _get_data(self, *, mode=None): """Returns DataSource geometries Keyword Arguments ----------------- mode : str return type ("numpy", "geo", "ogr"), defaults to "numpy" """ if mode is None: mode = self._mode lyr = self.ds.GetLayer() sources = [] for feature in lyr: geom = feature.GetGeometryRef() if mode == "numpy": poly = georef.vector.ogr_to_numpy(geom) sources.append(poly) else: poly = geom sources.append(poly) return np.array(sources, dtype=object)
[docs] def get_data_by_idx(self, idx, *, mode=None): """Returns DataSource geometries from given index Parameters ---------- idx : sequence sequence of int indices mode : str, optional return type ("numpy", "geo", "ogr"), defaults to "numpy" """ if mode is None: mode = self._mode if mode == "geo": if isinstance(idx, (list, slice)): return self.geo.loc[idx] elif np.isscalar(idx): return self.geo.iloc[idx] else: return self.geo.loc[idx] lyr = self.ds.GetLayer() lyr.ResetReading() lyr.SetSpatialFilter(None) lyr.SetAttributeFilter(None) sources = [] for i in idx: feature = lyr.GetFeature(i) geom = feature.GetGeometryRef() poly = georef.vector.ogr_to_numpy(geom) # need to recreate the geometry because access # is lost if layer gets out of scope if mode == "ogr": poly = georef.vector.numpy_to_ogr( poly, geom.GetGeometryName().capitalize() ) sources.append(poly) return np.array(sources, dtype=object)
[docs] def get_data_by_att(self, attr=None, value=None, mode=None): """Returns DataSource geometries filtered by given attribute/value Keyword Arguments ----------------- attr : str attribute name value : str attribute value mode : str return type ("numpy", "geo", "ogr"), defaults to "numpy" """ if mode is None: mode = self._mode if np.isscalar(value): sql = f"{attr}={value}" else: sql = f"{attr} in {tuple(value)}" if mode == "geo": return self.geo.query(sql) lyr = self.ds.GetLayer() lyr.ResetReading() lyr.SetSpatialFilter(None) lyr.SetAttributeFilter(sql) return self._get_data(mode=mode)
[docs] def get_data_by_geom(self, geom=None, mode=None): """Returns DataSource geometries filtered by given geometry Keyword Arguments ----------------- geom : :py:class:`gdal:osgeo.ogr.Geometry` | :py:class:`geopandas.GeoDataFrame` OGR.Geometry object or geopandas.GeoDataFrame containing the Geometry mode : str return type ("numpy", "geo", "ogr"), defaults to "numpy" """ if mode is None: mode = self._mode if mode == "geo": return self.geo[self.geo.within(geom)] lyr = self.ds.GetLayer() lyr.ResetReading() lyr.SetAttributeFilter(None) lyr.SetSpatialFilter(geom) return self._get_data(mode=mode)
def _create_spatial_index(self): """Creates spatial index file .qix""" sql1 = f"DROP SPATIAL INDEX ON {self._name}" sql2 = f"CREATE SPATIAL INDEX ON {self._name}" self.ds.ExecuteSQL(sql1) self.ds.ExecuteSQL(sql2) def _create_table_index(self, col): """Creates attribute index files""" sql1 = f"DROP INDEX ON {self._name}" sql2 = f"CREATE INDEX ON {self._name} USING {col}" self.ds.ExecuteSQL(sql1) self.ds.ExecuteSQL(sql2) def _check_src(self, src): """Basic check of source elements (sequence of points or polygons). - array cast of source elements - create ogr_src datasource/layer holding src points/polygons - transforming source grid points/polygons to ogr.geometries on ogr.layer """ tmpfile = tempfile.NamedTemporaryFile(mode="w+b").name ogr_src = gdal_create_dataset( "ESRI Shapefile", os.path.join("/vsimem", tmpfile), gdal_type=gdal.OF_VECTOR ) src = np.array(src) if self._src_crs and self._src_crs: src = georef.reproject(src, src_crs=self._src_crs, trg_crs=self._trg_crs) # create memory datasource, layer and create features if src.ndim == 2: geom_type = ogr.wkbPoint # no Polygons, just Points else: geom_type = ogr.wkbPolygon fields = [("index", ogr.OFTInteger)] georef.vector.ogr_create_layer( ogr_src, self._name, crs=self._trg_crs, geom_type=geom_type, fields=fields ) georef.vector.ogr_add_feature(ogr_src, src, name=self._name) return ogr_src
[docs] def dump_vector(self, filename, *, driver="ESRI Shapefile", remove=True): """Output layer to OGR Vector File Parameters ---------- filename : str path to shape-filename driver : str, optional driver string, defaults to "ESRI SHapefile" remove : bool, optional if True removes existing output file, defaults to True """ ds_out = gdal_create_dataset( driver, filename, gdal_type=gdal.OF_VECTOR, remove=remove ) georef.vector.ogr_copy_layer(self.ds, 0, ds_out) # flush everything del ds_out
[docs] def load_vector(self, filename, *, source=0, driver="ESRI Shapefile"): """Read Layer from OGR Vector File Parameters ---------- filename : str path to shape-filename source : int or str, optional number or name of wanted layer, defaults to 0 driver : str, optional driver string, defaults to "ESRI Shapefile" """ tmpfile = tempfile.NamedTemporaryFile(mode="w+b").name self.ds = gdal_create_dataset( "ESRI Shapefile", os.path.join("/vsimem", tmpfile), gdal_type=gdal.OF_VECTOR ) # get input file handles ds_in, tmp_lyr = open_vector(filename, driver=driver, layer=source) # get spatial reference object crs = tmp_lyr.GetSpatialRef() if crs is None: raise ValueError( f"Spatial reference missing from source file {filename}. " f"Please provide a file with spatial reference." ) # reproject layer if necessary if self._trg_crs is not None and crs is not None and crs != self._trg_crs: ogr_src_lyr = self.ds.CreateLayer( self._name, self._trg_crs, geom_type=ogr.wkbPolygon ) georef.vector.ogr_reproject_layer( tmp_lyr, ogr_src_lyr, self._trg_crs, src_crs=crs ) else: # copy layer ogr_src_lyr = self.ds.CopyLayer(tmp_lyr, self._name) if self._trg_crs is None: self._trg_crs = crs # flush everything del ds_in
[docs] def dump_raster( self, filename, *, driver="GTiff", attr=None, pixel_size=1.0, **kwargs, ): """Output layer to GDAL Rasterfile Parameters ---------- filename : str path to shape-filename driver : str, optional GDAL Raster Driver, defaults to "GTiff". attr : str, optional attribute to burn into raster, defaults to None. pixel_size : float, optional pixel Size in source units Keyword Arguments ----------------- remove : bool, optional if True removes existing output file. Defaults to True. silent : bool, optional If True no ProgressBar is shown. Defaults to False. """ silent = kwargs.get("silent", False) progress = None if (silent or isWindows) else gdal.TermProgress remove = kwargs.get("remove", True) layer = self.ds.GetLayer() layer.ResetReading() x_min, x_max, y_min, y_max = layer.GetExtent() cols = int((x_max - x_min) / pixel_size) rows = int((y_max - y_min) / pixel_size) # Todo: at the moment, always writing floats ds_out = gdal_create_dataset( "MEM", "", cols, rows, 1, gdal_type=gdal.GDT_Float32 ) ds_out.SetGeoTransform((x_min, pixel_size, 0, y_max, 0, -pixel_size)) crs = layer.GetSpatialRef() if crs is None: crs = self._trg_crs ds_out.SetProjection(crs.ExportToWkt()) band = ds_out.GetRasterBand(1) band.FlushCache() if attr is not None: gdal.RasterizeLayer( ds_out, [1], layer, burn_values=[0], options=[f"ATTRIBUTE={attr}", "ALL_TOUCHED=TRUE"], callback=progress, ) else: gdal.RasterizeLayer( ds_out, [1], layer, burn_values=[1], options=["ALL_TOUCHED=TRUE"], callback=progress, ) write_raster_dataset(filename, ds_out, driver=driver, remove=remove) del ds_out
[docs] def set_attribute(self, name, values, *, reset_filter=False): """Add/Set given Attribute with given values Parameters ---------- name : str Attribute Name values : :class:`numpy:numpy.ndarray` Values to fill in attributes. reset_filter : bool, optional reset any layer filter (spatial/attribute), defaults to False. """ lyr = self.ds.GetLayerByIndex(0) if reset_filter: lyr.SetAttributeFilter(None) lyr.SetSpatialFilter(None) lyr.ResetReading() # todo: automatically check for value type defn = lyr.GetLayerDefn() if defn.GetFieldIndex(name) == -1: lyr.CreateField(ogr.FieldDefn(name, ogr.OFTReal)) for i, item in enumerate(lyr): item.SetField(name, values[i]) lyr.SetFeature(item) lyr.SyncToDisk() self._geo = None
[docs] def get_attributes(self, attrs, *, filt=None): """Return attributes Parameters ---------- attrs : list Attribute Names to retrieve filt : tuple, optional (attname, value) for Attribute Filter, defaults to None """ lyr = self.ds.GetLayer() lyr.ResetReading() lyr.SetAttributeFilter(None) lyr.SetSpatialFilter(None) if filt is not None: lyr.SetAttributeFilter(f"{filt[0]}={filt[1]}") ret = [[] for _ in attrs] for ogr_src in lyr: for i, att in enumerate(attrs): ret[i].append(ogr_src.GetField(att)) return ret
[docs] def get_geom_properties(self, props, *, filt=None): """Return geometry properties Parameters ---------- props : list Property Names to retrieve filt : tuple, optional (attname, value) for Attribute Filter, defaults to None. """ lyr = self.ds.GetLayer() lyr.ResetReading() if filt is not None: lyr.SetAttributeFilter(f"{filt[0]}={filt[1]}") ret = [[] for _ in props] for ogr_src in lyr: for i, prop in enumerate(props): ret[i].append(getattr(ogr_src.GetGeometryRef(), prop)()) return ret
[docs] def get_attrs_and_props(self, *, attrs=None, props=None, filt=None): """Return properties and attributes Keyword Arguments ----------------- attrs : list Attribute Names to retrieve props : list Property Names to retrieve filt : tuple (attname, value) for Attribute Filter """ lyr = self.ds.GetLayer() lyr.ResetReading() if filt is not None: lyr.SetAttributeFilter(f"{filt[0]}={filt[1]}") ret_props = [[] for _ in props] ret_attrs = [[] for _ in attrs] for ogr_src in lyr: for i, att in enumerate(attrs): ret_attrs[i].append(ogr_src.GetField(att)) for i, prop in enumerate(props): ret_props[i].append(getattr(ogr_src.GetGeometryRef(), prop)()) return ret_attrs, ret_props