Source code for playground_metrics.utils.geometry

from enum import Enum
from os import cpu_count
from typing import Sequence

import dask.array
import numpy as np
from pygeos import intersection, union, area, STRtree, get_type_id, get_coordinates, box, centroid, bounds, distance, \
    prepare


[docs]class GeometryType(Enum): """An enumeration of all geometry types available in GEOS with their respective GEOS numeric code.""" NONE = -1 POINT = 0 LINESTRING = 1 LINEARRING = 2 POLYGON = 3 MULTIPOINT = 4 MULTILINESTRING = 5 MULTIPOLYGON = 6 GEOMETRYCOLLECTION = 7
def _sanitize(x): if len(x.shape) > 1: for i in range(1, len(x.shape)): x = x.squeeze(i) prepare(x) return x class _IntersectionOverUnionRTree: @staticmethod def _area_op_threaded(operation, candidates, x, y, default): areas = default((len(x), len(y))) # Define the operation x_dask = dask.array.from_array(x[candidates[:, 0]], chunks=int(len(x[candidates[:, 0]]) // cpu_count())) y_dask = dask.array.from_array(y[candidates[:, 1]], chunks=int(len(y[candidates[:, 1]]) // cpu_count())) res = x_dask.map_blocks(operation, y_dask, dtype=object) res = res.map_blocks(area, dtype=float).squeeze() areas[candidates[:, 0], candidates[:, 1]] = res.compute(scheduler="threads", optimize_graph=True, num_workers=cpu_count()) return areas @staticmethod def _area_op(operation, candidates, x, y, default): areas = default((len(x), len(y))) areas[candidates[:, 0], candidates[:, 1]] = area(operation(x[candidates[:, 0]], y[candidates[:, 1]])).squeeze() return areas def _call(self, x, y, force_thread=False): x, y = _sanitize(x), _sanitize(y) tree = STRtree(y) candidates = tree.query_bulk(x, predicate='intersects').transpose() area_op = self._area_op_threaded \ if force_thread else self._area_op_threaded \ if x.shape[0] * y.shape[0] > (cpu_count() * 100) else self._area_op return area_op(intersection, candidates, x, y, np.zeros) / area_op(union, candidates, x, y, np.ones) def __call__(self, x, y, force_thread=False): if x.size == 0 or y.size == 0: return np.zeros((len(x), len(y))) if x.size == 1 and y.size == 1: return self._call(x, y, force_thread=force_thread) return self._call(x, y, force_thread=force_thread) _iou_rtree_computer = _IntersectionOverUnionRTree()
[docs]def intersection_over_union(x, y, force_thread=False): """Compute the intersection-over-union in between every possible geometry pairs from two arrays of geometries. Args: x (ArrayLike): An array of geometries. y (ArrayLike): An array of geometries. force_thread (bool): Force the use of the threaded implementation. Note that this can incur a significant computational cost for small input and fail on input too small to be reliably chunked. Returns: numpy.ndarray: An intersection-over-union matrix. """ if not isinstance(x, (Sequence, np.ndarray)): x = [x] if not isinstance(y, (Sequence, np.ndarray)): y = [y] x, y = np.asarray(x), np.asarray(y) return _iou_rtree_computer(x, y, force_thread=force_thread)
class _EuclideanDistanceRTree: @staticmethod def _distance_op_threaded(candidates, x, y): point_x = as_points(x) point_y = as_points(y) areas = np.Inf * np.ones((len(x), len(y))) # Define the operation x_dask = dask.array.from_array(point_x[candidates[:, 0]], chunks=int(len(x[candidates[:, 0]]) // cpu_count())) y_dask = dask.array.from_array(point_y[candidates[:, 1]], chunks=int(len(y[candidates[:, 1]]) // cpu_count())) res = x_dask.map_blocks(distance, y_dask, dtype=float).squeeze() areas[candidates[:, 0], candidates[:, 1]] = res.compute(scheduler="threads", optimize_graph=True, num_workers=cpu_count()) return areas @staticmethod def _distance_op(candidates, x, y): point_x = as_points(x) point_y = as_points(y) areas = np.Inf * np.ones((len(x), len(y))) areas[candidates[:, 0], candidates[:, 1]] = distance(point_x[candidates[:, 0]], point_y[candidates[:, 1]]).squeeze() return areas def _call(self, x, y, force_thread=False): x, y = _sanitize(x), _sanitize(y) tree = STRtree(y) candidates = tree.query_bulk(x).transpose() distance_op = self._distance_op_threaded \ if force_thread else self._distance_op_threaded \ if x.shape[0] * y.shape[0] > (cpu_count() * 100) else self._distance_op return 1 - distance_op(candidates, x, y) def __call__(self, x, y, force_thread=False): if x.size == 0 or y.size == 0: return np.zeros((len(x), len(y))) if x.size == 1 and y.size == 1: return self._call(x, y, force_thread=force_thread) return self._call(x, y, force_thread=force_thread) _distance_rtree_computer = _EuclideanDistanceRTree()
[docs]def euclidean_distance(x, y, force_thread=False): """Compute the euclidean distance in between every possible centroid pairs from two arrays of geometries. Args: x (ArrayLike): An array of geometries. y (ArrayLike): An array of geometries. force_thread (bool): Force the use of the threaded implementation. Note that this can incur a significant computational cost for small input and fail on input too small to be reliably chunked. Returns: numpy.ndarray: An intersection-over-union matrix. """ if not isinstance(x, (Sequence, np.ndarray)): x = [x] if not isinstance(y, (Sequence, np.ndarray)): y = [y] x, y = np.asarray(x), np.asarray(y) return _distance_rtree_computer(x, y, force_thread=force_thread)
[docs]def point_to_box(x, width=64., height=64.): """Convert an array of points to an array of constant size boxes. Args: x (numpy.ndarray): An array of points. width (float): The output boxes' width. height (float): The output boxes' width. Returns: numpy.ndarray: An array of boxes. """ x = enforce_point(x) coordinates = get_coordinates(x) return box(coordinates[:, 0] - (width / 2), coordinates[:, 1] - (height / 2), coordinates[:, 0] + (width / 2), coordinates[:, 1] + (height / 2))
[docs]def is_type(x, *geometry_types): """Return whether geometries in the provided array are of one of the provided types. Args: x (numpy.ndarray): A 1D ``[geometry]`` array. *geometry_types (GeometryType): A geometry type. Returns: numpy.ndarray: A boolean array of whether geometries are of any of the provided``geometry_types``. """ types = get_type_id(x) return sum(types == geometry_type.value for geometry_type in geometry_types).astype(bool)
[docs]def as_points(x): """Convert an array of geometries to an array of centroid points. Args: x (numpy.ndarray): An array of geometries. Returns: numpy.ndarray: An array of points. """ return centroid(x)
[docs]def as_boxes(x): """Convert an array of geometries to an array of bounding boxes polygons. Args: x (numpy.ndarray): An array of points. Returns: numpy.ndarray: An array of polygons. """ coordinates = bounds(x) return box(coordinates[:, 0], coordinates[:, 1], coordinates[:, 2], coordinates[:, 3])
[docs]def enforce_type(x, convert_fn, *geometry_types, in_place=False): """Convert geometries which are not of one of the provided types with the provided conversion function. Args: x (numpy.ndarray): A 1D ``[geometry]`` array. convert_fn (callable): A geometry type conversion function. *geometry_types (GeometryType): A geometry type. in_place (bool): If ``True`` convert the geometries in place. Returns: numpy.ndarray: An array of geometries, some of which where converted. """ x = np.array(x, copy=not in_place) offending_mask = ~is_type(x, *geometry_types) x[offending_mask] = convert_fn(x[offending_mask]) return x
[docs]def enforce_point(x, in_place=False): """Convert non-point geometries to points. Args: x (numpy.ndarray): A 1D ``[geometry]`` array. in_place (bool): If ``True`` convert the geometries in place. Returns: numpy.ndarray: An array of points, some of which where converted. """ return enforce_type(x, as_points, GeometryType.POINT, in_place=in_place)
[docs]def enforce_polygon(x, in_place=False): """Convert non-polygon geometries to polygons. Args: x (numpy.ndarray): A 1D ``[geometry]`` array. in_place (bool): If ``True`` convert the geometries in place. Returns: numpy.ndarray: An array of points, some of which where converted. """ return enforce_type(x, as_boxes, GeometryType.POLYGON, in_place=in_place)