Source code for sedona.spark.utils.geometry_serde_general

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import array
import math
import struct
from typing import List, Optional, Tuple, Union

import numpy as np
from shapely.geometry import (
    GeometryCollection,
    LinearRing,
    LineString,
    MultiLineString,
    MultiPoint,
    MultiPolygon,
    Point,
    Polygon,
)
from shapely.geometry.base import BaseGeometry
from shapely.wkb import dumps as wkb_dumps
from shapely.wkt import loads as wkt_loads

CoordType = Union[
    Tuple[float, float], Tuple[float, float, float], Tuple[float, float, float, float]
]
ListCoordType = Union[
    List[Tuple[float, float]],
    List[Tuple[float, float, float]],
    List[Tuple[float, float, float, float]],
]

GET_COORDS_NUMPY_THRESHOLD = 50


[docs] class GeometryTypeID: """ Constants used to identify the geometry type in the serialized bytearray of geometry. """ POINT = 1 LINESTRING = 2 POLYGON = 3 MULTIPOINT = 4 MULTILINESTRING = 5 MULTIPOLYGON = 6 GEOMETRYCOLLECTION = 7
[docs] class CoordinateType: """ Constants used to identify geometry dimensions in the serialized bytearray of geometry. """ XY = 1 XYZ = 2 XYM = 3 XYZM = 4 BYTES_PER_COORDINATE = [16, 24, 24, 32] NUM_COORD_COMPONENTS = [2, 3, 3, 4] UNPACK_FORMAT = ["dd", "ddd", "ddxxxxxxxx", "dddxxxxxxxx"]
[docs] @staticmethod def type_of(geom) -> int: if geom._ndim == 2: return CoordinateType.XY elif geom._ndim == 3: return CoordinateType.XYZ else: raise ValueError(f"Invalid coordinate dimension: {geom._ndim}")
[docs] @staticmethod def bytes_per_coord(coord_type: int) -> int: return CoordinateType.BYTES_PER_COORDINATE[coord_type - 1]
[docs] @staticmethod def components_per_coord(coord_type: int) -> int: return CoordinateType.NUM_COORD_COMPONENTS[coord_type - 1]
[docs] @staticmethod def unpack_format(coord_type: int) -> str: return CoordinateType.UNPACK_FORMAT[coord_type - 1]
[docs] class GeometryBuffer: buffer: bytearray coord_type: int bytes_per_coord: int num_coords: int coords_offset: int ints_offset: int
[docs] def __init__( self, buffer: bytearray, coord_type: int, coords_offset: int, num_coords: int ) -> None: self.buffer = buffer self.coord_type = coord_type self.bytes_per_coord = CoordinateType.bytes_per_coord(coord_type) self.num_coords = num_coords self.coords_offset = coords_offset self.ints_offset = coords_offset + self.bytes_per_coord * num_coords
[docs] def read_linestring(self) -> LineString: num_points = self.read_int() return LineString(self.read_coordinates(num_points))
[docs] def read_linearring(self) -> LineString: num_points = self.read_int() return LinearRing(self.read_coordinates(num_points))
[docs] def read_polygon(self) -> Polygon: num_rings = self.read_int() if num_rings == 0: return Polygon() rings = [self.read_coordinates(self.read_int()) for _ in range(num_rings)] return Polygon(rings[0], rings[1:])
[docs] def write_linestring(self, line: LineString) -> None: coords = [tuple(c) for c in line.coords] self.write_int(len(coords)) self.coords_offset = put_coordinates( self.buffer, self.coords_offset, self.coord_type, coords )
[docs] def write_polygon(self, polygon: Polygon) -> None: exterior = polygon.exterior if not exterior.coords: self.write_int(0) return self.write_int(len(polygon.interiors) + 1) self.write_linestring(exterior) for interior in polygon.interiors: self.write_linestring(interior)
[docs] def read_coordinates(self, num_coords: int) -> ListCoordType: coords = get_coordinates( self.buffer, self.coords_offset, self.coord_type, num_coords ) self.coords_offset += num_coords * self.bytes_per_coord return coords
[docs] def read_coordinate(self) -> CoordType: coord = get_coordinate(self.buffer, self.coords_offset, self.coord_type) self.coords_offset += self.bytes_per_coord return coord
[docs] def read_int(self) -> int: value = struct.unpack_from("i", self.buffer, self.ints_offset)[0] if value > len(self.buffer): raise ValueError("Unexpected large integer in structural data") self.ints_offset += 4 return value
[docs] def write_int(self, value: int) -> None: struct.pack_into("i", self.buffer, self.ints_offset, value) self.ints_offset += 4
[docs] def serialize(geom: BaseGeometry) -> Optional[Union[bytes, bytearray]]: """ Serialize a shapely geometry object to the internal representation of GeometryUDT. :param geom: shapely geometry object :return: internal representation of GeometryUDT """ if geom is None: return None if isinstance(geom, Point): return serialize_point(geom) elif isinstance(geom, LineString): return serialize_linestring(geom) elif isinstance(geom, Polygon): return serialize_polygon(geom) elif isinstance(geom, MultiPoint): return serialize_multi_point(geom) elif isinstance(geom, MultiLineString): return serialize_multi_linestring(geom) elif isinstance(geom, MultiPolygon): return serialize_multi_polygon(geom) elif geom.geom_type == "GeometryCollection": # this check is slightly different due to how shapely # handles empty geometries in version 1.x return serialize_geometry_collection(geom) else: raise ValueError(f"Unsupported geometry type: {type(geom)}")
[docs] def deserialize(buffer: bytes) -> Optional[BaseGeometry]: """ Deserialize a shapely geometry object from the internal representation of GeometryUDT. :param buffer: internal representation of GeometryUDT :return: shapely geometry object """ if buffer is None: return None preamble_byte = buffer[0] geom_type = (preamble_byte >> 4) & 0x0F coord_type = (preamble_byte >> 1) & 0x07 num_coords = struct.unpack_from("i", buffer, 4)[0] if num_coords > len(buffer): raise ValueError("num_coords cannot be larger than buffer size") geom_buffer = GeometryBuffer(buffer, coord_type, 8, num_coords) if geom_type == GeometryTypeID.POINT: geom = deserialize_point(geom_buffer) elif geom_type == GeometryTypeID.LINESTRING: geom = deserialize_linestring(geom_buffer) elif geom_type == GeometryTypeID.POLYGON: geom = deserialize_polygon(geom_buffer) elif geom_type == GeometryTypeID.MULTIPOINT: geom = deserialize_multi_point(geom_buffer) elif geom_type == GeometryTypeID.MULTILINESTRING: geom = deserialize_multi_linestring(geom_buffer) elif geom_type == GeometryTypeID.MULTIPOLYGON: geom = deserialize_multi_polygon(geom_buffer) elif geom_type == GeometryTypeID.GEOMETRYCOLLECTION: geom = deserialize_geometry_collection(geom_buffer) else: raise ValueError(f"Unsupported geometry type ID: {geom_type}") return geom, geom_buffer.ints_offset
[docs] def create_buffer_for_geom( geom_type: int, coord_type: int, size: int, num_coords: int ) -> bytearray: buffer = bytearray(size) preamble_byte = (geom_type << 4) | (coord_type << 1) buffer[0] = preamble_byte struct.pack_into("i", buffer, 4, num_coords) return buffer
[docs] def generate_header_bytes(geom_type: int, coord_type: int, num_coords: int) -> bytes: preamble_byte = (geom_type << 4) | (coord_type << 1) return struct.pack("BBBBi", preamble_byte, 0, 0, 0, num_coords)
[docs] def put_coordinates( buffer: bytearray, offset: int, coord_type: int, coords: ListCoordType ) -> int: for coord in coords: struct.pack_into( CoordinateType.unpack_format(coord_type, buffer, offset, *coord) ) offset += CoordinateType.bytes_per_coord(coord_type) return offset
[docs] def put_coordinate( buffer: bytearray, offset: int, coord_type: int, coord: CoordType ) -> int: struct.pack_into(CoordinateType.unpack_format(coord_type, buffer, offset, *coord)) offset += CoordinateType.bytes_per_coord(coord_type) return offset
[docs] def get_coordinates( buffer: bytearray, offset: int, coord_type: int, num_coords: int ) -> Union[np.ndarray, ListCoordType]: if num_coords < GET_COORDS_NUMPY_THRESHOLD: coords = [ struct.unpack_from( CoordinateType.unpack_format(coord_type), buffer, offset + (i * CoordinateType.bytes_per_coord(coord_type)), ) for i in range(num_coords) ] else: nums_per_coord = CoordinateType.components_per_coord(coord_type) coords = np.frombuffer( buffer, np.float64, num_coords * nums_per_coord, offset ).reshape((num_coords, nums_per_coord)) return coords
[docs] def get_coordinate(buffer: bytearray, offset: int, coord_type: int) -> CoordType: return struct.unpack_from(CoordinateType.unpack_format(coord_type), buffer, offset)
[docs] def aligned_offset(offset: int) -> int: return (offset + 7) & ~7
[docs] def serialize_point(geom: Point) -> bytes: coords = [tuple(c) for c in geom.coords] if coords: # FIXME this does not handle M yet, but geom.has_z is extremely slow pack_format = "BBBBi" + "d" * geom._ndim coord_type = CoordinateType.type_of(geom) preamble_byte = (GeometryTypeID.POINT << 4) | (coord_type << 1) coords = coords[0] return struct.pack(pack_format, preamble_byte, 0, 0, 0, 1, *coords) else: return struct.pack("BBBBi", 18, 0, 0, 0, 0)
[docs] def deserialize_point(geom_buffer: GeometryBuffer) -> Point: if geom_buffer.num_coords == 0: # Here we don't call Point() directly since it would create an empty GeometryCollection # in shapely 1.x. You'll find similar code for creating empty geometries in other # deserialization functions. return wkt_loads("POINT EMPTY") coord = geom_buffer.read_coordinate() return Point(coord)
[docs] def serialize_multi_point(geom: MultiPoint) -> bytes: points = list(geom.geoms) num_points = len(points) if num_points == 0: return generate_header_bytes(GeometryTypeID.MULTIPOINT, CoordinateType.XY, 0) coord_type = CoordinateType.type_of(geom) header = generate_header_bytes(GeometryTypeID.MULTIPOINT, coord_type, num_points) coords = [] for point in points: if point.coords: for coord in point.coords[0]: coords.append(coord) else: for k in range(geom._ndim): coords.append(math.nan) body = array.array("d", coords).tobytes() return header + body
[docs] def deserialize_multi_point(geom_buffer: GeometryBuffer) -> MultiPoint: if geom_buffer.num_coords == 0: return wkt_loads("MULTIPOINT EMPTY") coords = geom_buffer.read_coordinates(geom_buffer.num_coords) points = [] for coord in coords: if math.isnan(coord[0]): # Shapely does not allow creating MultiPoint with empty components pass else: points.append(Point(coord)) return MultiPoint(points)
[docs] def serialize_linestring(geom: LineString) -> bytes: coords = [tuple(c) for c in geom.coords] if coords: coord_type = CoordinateType.type_of(geom) header = generate_header_bytes( GeometryTypeID.LINESTRING, coord_type, len(coords) ) return header + array.array("d", [x for c in coords for x in c]).tobytes() else: return generate_header_bytes(GeometryTypeID.LINESTRING, 1, 0)
[docs] def deserialize_linestring(geom_buffer: GeometryBuffer) -> LineString: if geom_buffer.num_coords == 0: return wkt_loads("LINESTRING EMPTY") coords = geom_buffer.read_coordinates(geom_buffer.num_coords) return LineString(coords)
[docs] def serialize_multi_linestring(geom: MultiLineString) -> bytes: linestrings = list(geom.geoms) coord_type = CoordinateType.type_of(geom) lines = [[list(coord) for coord in ls.coords] for ls in linestrings] line_lengths = [len(line) for line in lines] num_coords = sum(line_lengths) header = generate_header_bytes( GeometryTypeID.MULTILINESTRING, coord_type, num_coords ) coord_data = array.array( "d", [c for line in lines for coord in line for c in coord] ).tobytes() num_lines = struct.pack("i", len(lines)) structure_data = array.array("i", line_lengths).tobytes() result = header + coord_data + num_lines + structure_data return result
[docs] def deserialize_multi_linestring(geom_buffer: GeometryBuffer) -> MultiLineString: num_linestrings = geom_buffer.read_int() linestrings = [] for k in range(0, num_linestrings): linestring = geom_buffer.read_linestring() if not linestring.is_empty: linestrings.append(linestring) if not linestrings: return wkt_loads("MULTILINESTRING EMPTY") return MultiLineString(linestrings)
[docs] def serialize_polygon(geom: Polygon) -> bytes: # it may seem odd, but dumping to wkb and parsing proved to be the fastest here wkb_string = wkb_dumps(geom) int_format = ">i" if struct.unpack_from("B", wkb_string) == 0 else "<i" num_rings = struct.unpack_from(int_format, wkb_string, 5)[0] if num_rings == 0: return generate_header_bytes(GeometryTypeID.POLYGON, CoordinateType.XY, 0) coord_bytes = b"" ring_lengths = [] offset = 9 # 1 byte endianness + 4 byte geom type + 4 byte num rings bytes_per_coord = geom._ndim * 8 num_coords = 0 for _ in range(num_rings): ring_len = struct.unpack_from(int_format, wkb_string, offset)[0] ring_lengths.append(ring_len) num_coords += ring_len offset += 4 coord_bytes += wkb_string[offset : (offset + bytes_per_coord * ring_len)] offset += bytes_per_coord * ring_len coord_type = CoordinateType.type_of(geom) header = generate_header_bytes(GeometryTypeID.POLYGON, coord_type, num_coords) structure_data_bytes = array.array("i", [num_rings] + ring_lengths).tobytes() return header + coord_bytes + structure_data_bytes
[docs] def deserialize_polygon(geom_buffer: GeometryBuffer) -> Polygon: if geom_buffer.num_coords == 0: return wkt_loads("POLYGON EMPTY") return geom_buffer.read_polygon()
[docs] def serialize_multi_polygon(geom: MultiPolygon) -> bytes: coords_for = lambda x: [y for y in list(x)] polygons = [ [coords_for(polygon.exterior.coords)] + [coords_for(ring.coords) for ring in polygon.interiors] for polygon in list(geom.geoms) ] coord_type = CoordinateType.type_of(geom) structure_data = array.array( "i", [ val for polygon in polygons for val in [len(polygon)] + [len(ring) for ring in polygon] ], ).tobytes() coords = array.array( "d", [ component for polygon in polygons for ring in polygon for coord in ring for component in coord ], ).tobytes() num_coords = len(coords) // CoordinateType.bytes_per_coord(coord_type) header = generate_header_bytes(GeometryTypeID.MULTIPOLYGON, coord_type, num_coords) num_polygons = struct.pack("i", len(polygons)) result = header + coords + num_polygons + structure_data return result
[docs] def deserialize_multi_polygon(geom_buffer: GeometryBuffer) -> MultiPolygon: num_polygons = geom_buffer.read_int() polygons = [] for k in range(0, num_polygons): polygon = geom_buffer.read_polygon() if not polygon.is_empty: polygons.append(polygon) if not polygons: return wkt_loads("MULTIPOLYGON EMPTY") return MultiPolygon(polygons)
[docs] def serialize_geometry_collection(geom: GeometryCollection) -> bytearray: if not isinstance(geom, GeometryCollection): # In shapely 1.x, Empty geometries such as empty points, empty polygons etc. # have geom_type property evaluated to 'GeometryCollection'. Such geometries are not # instances of GeometryCollection and do not have `geom` property. return serialize_shapely_1_empty_geom(geom) geometries = geom.geoms if not geometries: return create_buffer_for_geom( GeometryTypeID.GEOMETRYCOLLECTION, CoordinateType.XY, 8, 0 ) num_geometries = len(geometries) total_size = 8 buffers = [] for geom in geometries: buf = serialize(geom) buffers.append(buf) total_size += aligned_offset(len(buf)) buffer = create_buffer_for_geom( GeometryTypeID.GEOMETRYCOLLECTION, CoordinateType.XY, total_size, num_geometries ) offset = 8 for buf in buffers: buffer[offset : (offset + len(buf))] = buf offset += aligned_offset(len(buf)) return buffer
[docs] def serialize_shapely_1_empty_geom(geom: BaseGeometry) -> bytearray: total_size = 8 if isinstance(geom, Point): geom_type = GeometryTypeID.POINT elif isinstance(geom, LineString): geom_type = GeometryTypeID.LINESTRING elif isinstance(geom, Polygon): geom_type = GeometryTypeID.POLYGON elif isinstance(geom, MultiPoint): geom_type = GeometryTypeID.MULTIPOINT elif isinstance(geom, MultiLineString): geom_type = GeometryTypeID.MULTILINESTRING total_size = 12 elif isinstance(geom, MultiPolygon): geom_type = GeometryTypeID.MULTIPOLYGON total_size = 12 else: raise ValueError(f"Invalid empty geometry collection object: {geom}") return create_buffer_for_geom(geom_type, CoordinateType.XY, total_size, 0)
[docs] def deserialize_geometry_collection(geom_buffer: GeometryBuffer) -> GeometryCollection: num_geometries = geom_buffer.num_coords if num_geometries == 0: return wkt_loads("GEOMETRYCOLLECTION EMPTY") geometries = [] geom_end_offset = 8 buffer = geom_buffer.buffer[8:] for k in range(0, num_geometries): geom, offset = deserialize(buffer) geometries.append(geom) offset = aligned_offset(offset) buffer = buffer[offset:] geom_end_offset += offset geom_buffer.ints_offset = geom_end_offset return GeometryCollection(geometries)