# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import struct
from typing import List, Union
import attr
from pyspark import SparkContext
from shapely.wkb import loads
DOUBLE_SIZE = 8
INT_SIZE = 4
BYTE_SIZE = 1
CHAR_SIZE = 1
BOOLEAN_SIZE = 1
size_dict = {
"d": DOUBLE_SIZE,
"i": INT_SIZE,
"b": BYTE_SIZE,
"s": CHAR_SIZE,
"?": BOOLEAN_SIZE,
}
[docs]
@attr.s
class BinaryParser:
bytes = attr.ib(type=Union[bytearray, List[int]])
current_index = attr.ib(default=0)
def __attrs_post_init__(self):
no_negatives = self.remove_negatives(self.bytes)
self.bytes = self._convert_to_binary_array(no_negatives)
[docs]
def read_geometry(self, length: int):
geom_bytes = b"".join(
[
struct.pack("b", el) if el < 128 else struct.pack("b", el - 256)
for el in self.bytes[self.current_index : self.current_index + length]
]
)
geom = loads(geom_bytes)
self.current_index += length
return geom
[docs]
def read_double(self):
data = self.unpack("d", self.bytes)
self.current_index = self.current_index + DOUBLE_SIZE
return data
[docs]
def read_double_reverse(self):
data = self.unpack_reverse("d", self.bytes)
self.current_index = self.current_index + DOUBLE_SIZE
return data
[docs]
def read_int(self):
data = self.unpack("i", self.bytes)
self.current_index = self.current_index + INT_SIZE
return data
[docs]
def read_byte(self):
data = self.unpack("b", self.bytes)
self.current_index = self.current_index + BYTE_SIZE
return data
[docs]
def read_char(self):
data = self.unpack("c", self.bytes)
self.current_index = self.current_index + CHAR_SIZE
return data
[docs]
def read_boolean(self):
data = self.unpack("?", self.bytes)
self.current_index = self.current_index + BOOLEAN_SIZE
return data
[docs]
def read_string(self, length: int, encoding: str = "utf8"):
string = self.bytes[self.current_index : self.current_index + length]
self.current_index += length
try:
encoded_string = string.decode(encoding, "ignore")
except UnicodeEncodeError:
raise UnicodeEncodeError
return encoded_string
[docs]
def read_kryo_string(self, length: int, sc: SparkContext) -> str:
array_length = length - self.current_index
byte_array = sc._gateway.new_array(sc._jvm.Byte, array_length)
for index, bt in enumerate(self.bytes[self.current_index : length]):
byte_array[index] = self.bytes[self.current_index + index]
decoded_string = sc._jvm.org.imbruced.geo_pyspark.serializers.GeoSerializerData.deserializeUserData(
byte_array
)
self.current_index = length
return decoded_string
[docs]
def unpack(self, tp: str, bytes: bytearray):
max_index = self.current_index + size_dict[tp]
bytes = self._convert_to_binary_array(bytes[self.current_index : max_index])
return struct.unpack(tp, bytes)[0]
[docs]
def unpack_reverse(self, tp: str, bytes: bytearray):
max_index = self.current_index + size_dict[tp]
bytes = bytearray(
reversed(
self._convert_to_binary_array(bytes[self.current_index : max_index])
)
)
return struct.unpack(tp, bytes)[0]
[docs]
@classmethod
def remove_negatives(cls, bytes):
return [cls.remove_negative(bt) for bt in bytes]
[docs]
@classmethod
def remove_negative(cls, byte):
bt_pos = byte if byte >= 0 else byte + 256
return bt_pos
@staticmethod
def _convert_to_binary_array(bytes):
if type(bytes) == list:
bytes = bytearray(bytes)
return bytes
[docs]
class BinaryBuffer:
[docs]
def __init__(self):
self.array = []
[docs]
def put_double(self, value):
bytes = self.__pack("d", value)
self.__extend_buffer(bytes)
[docs]
def put_int(self, value):
bytes = self.__pack("i", value)
self.__extend_buffer(bytes)
[docs]
def put_byte(self, value):
bytes = self.__pack("b", value)
self.__extend_buffer(bytes)
[docs]
def put(self, value):
self.__extend_buffer(value)
def __pack(self, type, value):
return struct.pack(type, value)
def __extend_buffer(self, bytes):
self.array.extend(list(bytes))
def __translate_values(self, values):
return [el if el < 128 else el - 256 for el in values]
[docs]
def add_empty_bytes(self, tp: str, number_of_empty):
if tp == "double":
for _ in range(number_of_empty):
self.put_double(0.0)
elif tp == "int":
for _ in range(number_of_empty):
self.put_int(0)
elif tp == "double":
for _ in range(number_of_empty):
self.put_byte(0)
else:
raise TypeError(f"Passed {tp} is not available")
@property
def byte_array(self):
return self.__translate_values(self.array)