# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import struct
from abc import ABC
from copy import copy
from typing import Any, List
import attr
from shapely.geometry.base import BaseGeometry
try:
from pyspark import CPickleSerializer
except ImportError:
from pyspark import PickleSerializer as CPickleSerializer
from shapely.wkb import dumps
from sedona.spark.utils.binary_parser import BinaryParser
[docs]
class GeoData:
[docs]
def __init__(self, geom: BaseGeometry, userData: str):
"""
:param geom:
:param userData:
"""
self._geom = geom
self._userData = userData
[docs]
def getUserData(self):
return self.userData
def __getstate__(self):
from sedona.spark.core.geom.circle import Circle
attributes = copy(self.__slots__)
geom = getattr(self, attributes[0])
if isinstance(geom, Circle):
geom_bytes = CircleGeometryFactory.to_bytes(geom)
else:
geom_bytes = GeometryFactory.to_bytes(geom)
return dict(
geom=bytearray([el if el >= 0 else el + 256 for el in geom_bytes]),
userData=getattr(self, attributes[1]),
)
def __setstate__(self, attributes):
from sedona.spark.core.geom.circle import Circle
bin_parser = BinaryParser(attributes["geom"])
is_circle = bin_parser.read_byte()
geom_bytes = attributes["geom"]
if is_circle:
radius = bin_parser.read_double()
geom = bin_parser.read_geometry(geom_bytes.__len__() - 9)
self._geom = Circle(geom, radius)
else:
self._geom = bin_parser.read_geometry(geom_bytes.__len__() - 1)
self._userData = attributes["userData"]
@property
def geom(self):
return self._geom
@property
def userData(self):
return self._userData
__slots__ = ("_geom", "_userData")
def __repr__(self):
return (
f"Geometry: {str(self.geom.__class__.__name__)} userData: {self.userData}"
)
def __eq__(self, other):
return self.geom == other.geom and self.userData == other.userData
def __ne__(self, other):
return self.geom != other.geom or self.userData != other.userData
[docs]
@attr.s
class AbstractSpatialRDDParser(ABC):
[docs]
@classmethod
def serialize(cls, obj: List[Any], binary_buffer: "BinaryBuffer") -> bytearray:
raise NotImplemented()
[docs]
@classmethod
def deserialize(cls, bin_parser: "BinaryParser") -> BaseGeometry:
raise NotImplementedError("Parser has to implement deserialize method")
@classmethod
def _deserialize_geom(cls, bin_parser: "BinaryParser") -> GeoData:
is_circle = bin_parser.read_byte()
return geom_deserializers[is_circle].geometry_from_bytes(bin_parser)
[docs]
@attr.s
class SpatialPairRDDParserData(AbstractSpatialRDDParser):
name = "SpatialPairRDDParserData"
[docs]
@classmethod
def deserialize(cls, bin_parser: "BinaryParser"):
left_geom_data = cls._deserialize_geom(bin_parser)
_ = bin_parser.read_int()
right_geom_data = cls._deserialize_geom(bin_parser)
deserialized_data = [left_geom_data, right_geom_data]
return deserialized_data
[docs]
@classmethod
def serialize(cls, obj: BaseGeometry, binary_buffer: "BinaryBuffer"):
raise NotImplementedError("Currently this operation is not supported")
[docs]
@attr.s
class SpatialRDDParserData(AbstractSpatialRDDParser):
name = "SpatialRDDParser"
[docs]
@classmethod
def deserialize(cls, bin_parser: "BinaryParser"):
left_geom_data = cls._deserialize_geom(bin_parser)
_ = bin_parser.read_int()
return left_geom_data
[docs]
@classmethod
def serialize(cls, obj: BaseGeometry, binary_buffer: "BinaryBuffer"):
raise NotImplementedError("Currently this operation is not supported")
[docs]
@attr.s
class SpatialRDDParserDataMultipleRightGeom(AbstractSpatialRDDParser):
name = "SpatialRDDParser"
[docs]
@classmethod
def deserialize(cls, bin_parser: "BinaryParser"):
left_geom_data = cls._deserialize_geom(bin_parser)
geometry_numbers = bin_parser.read_int()
right_geoms = []
for right_geometry_number in range(geometry_numbers):
right_geom_data = cls._deserialize_geom(bin_parser)
right_geoms.append(right_geom_data)
deserialized_data = (
[left_geom_data, right_geoms] if right_geoms else left_geom_data
)
return deserialized_data
[docs]
@classmethod
def serialize(cls, obj: BaseGeometry, binary_buffer: "BinaryBuffer"):
raise NotImplementedError("Currently this operation is not supported")
PARSERS = {
0: SpatialRDDParserData(),
1: SpatialRDDParserDataMultipleRightGeom(),
2: SpatialPairRDDParserData(),
}
[docs]
class SedonaPickler(CPickleSerializer):
[docs]
def __init__(self):
super().__init__()
[docs]
def loads(self, obj, encoding="bytes"):
binary_parser = BinaryParser(obj)
spatial_parser_number = binary_parser.read_int()
spatial_parser = self.get_parser(spatial_parser_number)
parsed_row = spatial_parser.deserialize(binary_parser)
return parsed_row
[docs]
def dumps(self, obj):
raise NotImplementedError()
[docs]
def get_parser(self, number: int):
return PARSERS[number]
[docs]
def read_geometry_from_bytes(bin_parser: BinaryParser):
geom_data_length = bin_parser.read_int()
user_data_length = bin_parser.read_int()
geom = bin_parser.read_geometry(geom_data_length)
user_data = bin_parser.read_string(user_data_length)
return (geom, user_data)
[docs]
@attr.s
class GeometryFactory:
[docs]
@classmethod
def geometry_from_bytes(cls, bin_parser: BinaryParser) -> GeoData:
geom, user_data = read_geometry_from_bytes(bin_parser)
geo_data = GeoData(geom=geom, userData=user_data)
return geo_data
[docs]
@classmethod
def to_bytes(cls, geom: BaseGeometry) -> List[int]:
return struct.pack("b", 0) + dumps(geom)
[docs]
@attr.s
class CircleGeometryFactory:
[docs]
@classmethod
def geometry_from_bytes(cls, bin_parser: BinaryParser) -> GeoData:
from sedona.spark.core.geom.circle import Circle
geom, user_data = read_geometry_from_bytes(bin_parser)
radius = bin_parser.read_double()
geo_data = GeoData(geom=Circle(geom, radius), userData=user_data)
return geo_data
[docs]
@classmethod
def to_bytes(cls, geom: "Circle") -> List[int]:
return (
struct.pack("b", 1)
+ struct.pack("d", geom.radius)
+ dumps(geom.centerGeometry)
)
geom_deserializers = {1: CircleGeometryFactory, 0: GeometryFactory}