# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import typing
from typing import Any, Union, Literal, List
import numpy as np
import geopandas as gpd
import sedona.spark.geopandas as sgpd
import pandas as pd
import pyspark.pandas as pspd
import pyspark
from pyspark.pandas import Series as PandasOnSparkSeries
from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
from pyspark.pandas.internal import InternalFrame
from pyspark.pandas.series import first_series
from pyspark.pandas.utils import scol_for
from pyspark.sql.types import NullType
from sedona.spark.sql.types import GeometryType
from sedona.spark.sql import st_aggregates as sta
from sedona.spark.sql import st_constructors as stc
from sedona.spark.sql import st_functions as stf
from sedona.spark.sql import st_predicates as stp
from pyspark.sql import Column as PySparkColumn
from pyspark.sql import functions as F
import shapely
from shapely.geometry.base import BaseGeometry
from sedona.spark.geopandas._typing import Label
from sedona.spark.geopandas.base import GeoFrame
from sedona.spark.geopandas.geodataframe import GeoDataFrame
from sedona.spark.geopandas.sindex import SpatialIndex
from packaging.version import parse as parse_version
from pyspark.pandas.internal import (
SPARK_DEFAULT_INDEX_NAME, # __index_level_0__
NATURAL_ORDER_COLUMN_NAME,
SPARK_DEFAULT_SERIES_NAME, # '0'
)
# ============================================================================
# IMPLEMENTATION STATUS TRACKING
# ============================================================================
IMPLEMENTATION_STATUS = {
"IMPLEMENTED": [
"area",
"buffer",
"bounds",
"centroid",
"contains",
"crs",
"distance",
"envelope",
"geometry",
"intersection",
"intersects",
"is_empty",
"is_simple",
"is_valid",
"is_valid_reason",
"length",
"make_valid",
"set_crs",
"to_crs",
"to_geopandas",
"to_wkb",
"to_wkt",
"x",
"y",
"z",
"has_z",
"get_geometry",
"boundary",
"total_bounds",
"estimate_utm_crs",
"isna",
"isnull",
"notna",
"notnull",
"from_xy",
"copy",
"geom_type",
"sindex",
],
"NOT_IMPLEMENTED": [
"clip",
"contains_properly",
"convex_hull",
"count_coordinates",
"count_geometries",
"count_interior_rings",
"explode",
"force_2d",
"force_3d",
"from_file",
"from_shapely",
"from_arrow",
"line_merge",
"reverse",
"segmentize",
"to_json",
"to_arrow",
"to_file",
"transform",
"unary_union",
"union_all",
"intersection_all",
"type",
"is_ring",
"is_ccw",
"is_closed",
"get_precision",
"concave_hull",
"delaunay_triangles",
"voronoi_polygons",
"minimum_rotated_rectangle",
"exterior",
"extract_unique_points",
"offset_curve",
"interiors",
"remove_repeated_points",
"set_precision",
"representative_point",
"minimum_bounding_circle",
"minimum_bounding_radius",
"minimum_clearance",
"normalize",
"m",
],
"PARTIALLY_IMPLEMENTED": [
"fillna", # Limited parameter support (no 'limit' parameter)
"from_wkb",
"from_wkt", # Limited error handling options (only 'raise' supported)
],
}
IMPLEMENTATION_PRIORITY = {
"HIGH": [
"contains",
"contains_properly",
"convex_hull",
"explode",
"clip",
"from_shapely",
"count_coordinates",
"count_geometries",
"is_ring",
"is_closed",
"reverse",
],
"MEDIUM": [
"force_2d",
"force_3d",
"transform",
"segmentize",
"line_merge",
"unary_union",
"union_all",
"to_json",
"from_file",
"count_interior_rings",
],
"LOW": [
"delaunay_triangles",
"voronoi_polygons",
"minimum_bounding_circle",
"representative_point",
"extract_unique_points",
"from_arrow",
"to_arrow",
],
}
def _not_implemented_error(method_name: str, additional_info: str = "") -> str:
"""
Generate a standardized NotImplementedError message.
Parameters
----------
method_name : str
The name of the method that is not implemented.
additional_info : str, optional
Additional information about the method or workarounds.
Returns
-------
str
Formatted error message.
"""
base_message = (
f"GeoSeries.{method_name}() is not implemented yet.\n"
f"This method will be added in a future release."
)
if additional_info:
base_message += f"\n\n{additional_info}"
workaround = (
"\n\nTemporary workaround - use GeoPandas:\n"
" gpd_series = sedona_series.to_geopandas()\n"
f" result = gpd_series.{method_name}(...)\n"
" # Note: This will collect all data to the driver."
)
return base_message + workaround
[docs]
class GeoSeries(GeoFrame, pspd.Series):
"""
A pandas-on-Spark Series for geometric/spatial operations.
GeoSeries extends pyspark.pandas.Series to provide spatial operations
using Apache Sedona's spatial functions. It maintains compatibility
with GeoPandas GeoSeries while operating on distributed datasets.
Parameters
----------
data : array-like, Iterable, dict, or scalar value
Contains the data for the GeoSeries. Can be geometries, WKB bytes,
or other GeoSeries/GeoDataFrame objects.
index : array-like or Index (1d), optional
Values must be hashable and have the same length as `data`.
crs : pyproj.CRS, optional
Coordinate Reference System for the geometries.
dtype : dtype, optional
Data type for the GeoSeries.
name : str, optional
Name of the GeoSeries.
copy : bool, default False
Whether to copy the input data.
Examples
--------
>>> from shapely.geometry import Point, Polygon
>>> from sedona.spark.geopandas import GeoSeries
>>>
>>> # Create from geometries
>>> s = GeoSeries([Point(0, 0), Point(1, 1)], crs='EPSG:4326')
>>> s
0 POINT (0 0)
1 POINT (1 1)
dtype: geometry
>>>
>>> # Spatial operations
>>> s.buffer(0.1).area
0 0.031416
1 0.031416
dtype: float64
>>>
>>> # CRS operations
>>> s_utm = s.to_crs('EPSG:32633')
>>> s_utm.crs
<Projected CRS: EPSG:32633>
Name: WGS 84 / UTM zone 33N
...
Notes
-----
This implementation differs from GeoPandas in several ways:
- Uses Spark for distributed processing
- Geometries are stored in WKB (Well-Known Binary) format internally
- Some methods may have different performance characteristics
- Not all GeoPandas methods are implemented yet (see IMPLEMENTATION_STATUS)
Performance Considerations:
- Operations are distributed across Spark cluster
- Avoid calling .to_geopandas() on large datasets
- Use .sample() for testing with large datasets
See Also
--------
geopandas.GeoSeries : The GeoPandas equivalent
sedona.spark.geopandas.GeoDataFrame : DataFrame with geometry column
"""
def __getitem__(self, key: Any) -> Any:
return pspd.Series.__getitem__(self, key)
[docs]
def __init__(
self,
data=None,
index=None,
dtype=None,
name=None,
copy=False,
fastpath=False,
crs=None,
**kwargs,
):
"""
Initialize a GeoSeries object.
Parameters:
- data: The input data for the GeoSeries. It can be a GeoDataFrame, GeoSeries, or pandas Series.
- index: The index for the GeoSeries.
- crs: Coordinate Reference System for the GeoSeries.
- dtype: Data type for the GeoSeries.
- name: Name of the GeoSeries.
- copy: Whether to copy the input data.
- fastpath: Internal parameter for fast initialization.
Examples
--------
>>> from shapely.geometry import Point
>>> import geopandas as gpd
>>> import pandas as pd
>>> from sedona.spark.geopandas import GeoSeries
# Example 1: Initialize with GeoDataFrame
>>> gdf = gpd.GeoDataFrame({'geometry': [Point(1, 1), Point(2, 2)]})
>>> gs = GeoSeries(data=gdf)
>>> print(gs)
0 POINT (1 1)
1 POINT (2 2)
Name: geometry, dtype: geometry
# Example 2: Initialize with GeoSeries
>>> gseries = gpd.GeoSeries([Point(1, 1), Point(2, 2)])
>>> gs = GeoSeries(data=gseries)
>>> print(gs)
0 POINT (1 1)
1 POINT (2 2)
dtype: geometry
# Example 3: Initialize with pandas Series
>>> pseries = pd.Series([Point(1, 1), Point(2, 2)])
>>> gs = GeoSeries(data=pseries)
>>> print(gs)
0 POINT (1 1)
1 POINT (2 2)
dtype: geometry
"""
assert data is not None
self._anchor: GeoDataFrame
self._col_label: Label
self._sindex: SpatialIndex = None
if isinstance(
data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries, PandasOnSparkDataFrame)
):
assert dtype is None
assert name is None
assert not copy
assert not fastpath
# We don't check crs validity to keep the operation lazy.
# Keep the original code for now
# data_crs = None
# if hasattr(data, "crs"):
# data_crs = data.crs
# if data_crs is not None and crs is not None and data_crs != crs:
# raise ValueError(
# "CRS mismatch between CRS of the passed geometries "
# "and 'crs'. Use 'GeoSeries.set_crs(crs, "
# "allow_override=True)' to overwrite CRS or "
# "'GeoSeries.to_crs(crs)' to reproject geometries. "
# )
# PySpark Pandas' ps.Series.__init__() does not construction from a
# ps.Series input. For now, we manually implement the logic.
index = data._col_label if index is None else index
ps_df = pspd.DataFrame(data._anchor)
super().__init__(
data=ps_df,
index=index,
dtype=dtype,
name=name,
copy=copy,
fastpath=fastpath,
)
else:
if isinstance(data, pd.Series):
assert index is None
assert dtype is None
assert name is None
assert not copy
assert not fastpath
pd_series = data
else:
pd_series = pd.Series(
data=data,
index=index,
dtype=dtype,
name=name,
copy=copy,
fastpath=fastpath,
)
pd_series = pd_series.astype(object)
# initialize the parent class pyspark Series with the pandas Series
super().__init__(data=pd_series)
# Ensure we're storing geometry types
if (
self.spark.data_type != GeometryType()
and self.spark.data_type != NullType()
):
raise TypeError(
"Non geometry data passed to GeoSeries constructor, "
f"received data of dtype '{self.spark.data_type.typeName()}'"
)
if crs:
self.set_crs(crs, inplace=True)
# ============================================================================
# COORDINATE REFERENCE SYSTEM (CRS) OPERATIONS
# ============================================================================
@property
def crs(self) -> Union["CRS", None]:
"""The Coordinate Reference System (CRS) as a ``pyproj.CRS`` object.
Returns None if the CRS is not set, and to set the value it
:getter: Returns a ``pyproj.CRS`` or None. When setting, the value
can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
Note: This assumes all records in the GeoSeries are assumed to have the same CRS.
Examples
--------
>>> from shapely.geometry import Point
>>> from sedona.spark.geopandas import GeoSeries
>>> s = GeoSeries([Point(1, 1), Point(2, 2)], crs='EPSG:4326')
>>> s.crs # doctest: +SKIP
<Geographic 2D CRS: EPSG:4326>
Name: WGS 84
Axis Info [ellipsoidal]:
- Lat[north]: Geodetic latitude (degree)
- Lon[east]: Geodetic longitude (degree)
Area of Use:
- name: World
- bounds: (-180.0, -90.0, 180.0, 90.0)
Datum: World Geodetic System 1984
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich
See Also
--------
GeoSeries.set_crs : assign CRS
GeoSeries.to_crs : re-project to another CRS
"""
from pyproj import CRS
if len(self) == 0:
return None
# F.first is non-deterministic, but it doesn't matter because all non-null values should be the same
spark_col = stf.ST_SRID(F.first(self.spark.column, ignorenulls=True))
# Set this to avoid error complaining that we don't have a groupby column
tmp_series = self._query_geometry_column(
spark_col,
returns_geom=False,
is_aggr=True,
)
# All geometries should have the same srid
# so we just take the srid of the first non-null element
srid = tmp_series.item()
# Turn np.nan to 0 to avoid error
srid = 0 if np.isnan(srid) else srid
# Sedona returns 0 if doesn't exist
return CRS.from_user_input(srid) if srid != 0 else None
@crs.setter
def crs(self, value: Union["CRS", None]):
self.set_crs(value, inplace=True)
@typing.overload
def set_crs(
self,
crs: Union[Any, None] = None,
epsg: Union[int, None] = None,
inplace: Literal[True] = True,
allow_override: bool = False,
) -> None: ...
@typing.overload
def set_crs(
self,
crs: Union[Any, None] = None,
epsg: Union[int, None] = None,
inplace: Literal[False] = False,
allow_override: bool = False,
) -> "GeoSeries": ...
[docs]
def set_crs(
self,
crs: Union[Any, None] = None,
epsg: Union[int, None] = None,
inplace: bool = False,
allow_override: bool = True,
) -> Union["GeoSeries", None]:
"""
Set the Coordinate Reference System (CRS) of a ``GeoSeries``.
Pass ``None`` to remove CRS from the ``GeoSeries``.
Notes
-----
The underlying geometries are not transformed to this CRS. To
transform the geometries to a new CRS, use the ``to_crs`` method.
Parameters
----------
crs : pyproj.CRS | None, optional
The value can be anything accepted
by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
epsg : int, optional if `crs` is specified
EPSG code specifying the projection.
inplace : bool, default False
If True, the CRS of the GeoSeries will be changed in place
(while still returning the result) instead of making a copy of
the GeoSeries.
allow_override : bool, default True
If the GeoSeries already has a CRS, allow to replace the
existing CRS, even when both are not equal. In Sedona, setting this to True
will lead to eager evaluation instead of lazy evaluation. Unlike Geopandas,
True is the default value in Sedona for performance reasons.
Returns
-------
GeoSeries
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Point
>>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)])
>>> s
0 POINT (1 1)
1 POINT (2 2)
2 POINT (3 3)
dtype: geometry
Setting CRS to a GeoSeries without one:
>>> s.crs is None
True
>>> s = s.set_crs('epsg:3857')
>>> s.crs # doctest: +SKIP
<Projected CRS: EPSG:3857>
Name: WGS 84 / Pseudo-Mercator
Axis Info [cartesian]:
- X[east]: Easting (metre)
- Y[north]: Northing (metre)
Area of Use:
- name: World - 85°S to 85°N
- bounds: (-180.0, -85.06, 180.0, 85.06)
Coordinate Operation:
- name: Popular Visualisation Pseudo-Mercator
- method: Popular Visualisation Pseudo Mercator
Datum: World Geodetic System 1984
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich
Overriding existing CRS:
>>> s = s.set_crs(4326, allow_override=True)
Without ``allow_override=True``, ``set_crs`` returns an error if you try to
override CRS.
See Also
--------
GeoSeries.to_crs : re-project to another CRS
"""
from pyproj import CRS
if crs is not None:
crs = CRS.from_user_input(crs)
elif epsg is not None:
crs = CRS.from_epsg(epsg)
# The below block for the not allow_override case is eager due to the self.crs call
# This hurts performance and user experience, hence the default being set to True in Sedona
if not allow_override:
curr_crs = self.crs
if curr_crs is not None and not curr_crs == crs:
raise ValueError(
"The GeoSeries already has a CRS which is not equal to the passed "
"CRS. Specify 'allow_override=True' to allow replacing the existing "
"CRS without doing any transformation. If you actually want to "
"transform the geometries, use 'GeoSeries.to_crs' instead."
)
# 0 indicates no srid in sedona
new_epsg = crs.to_epsg() if crs else 0
spark_col = stf.ST_SetSRID(self.spark.column, new_epsg)
result = self._query_geometry_column(spark_col, keep_name=True)
if inplace:
self._update_inplace(result, invalidate_sindex=False)
return None
return result
# ============================================================================
# INTERNAL HELPER METHODS
# ============================================================================
def _query_geometry_column(
self,
spark_col: PySparkColumn,
df: pyspark.sql.DataFrame = None,
returns_geom: bool = True,
is_aggr: bool = False,
keep_name: bool = False,
) -> Union["GeoSeries", pspd.Series]:
"""
Helper method to query a single geometry column with a specified operation.
Parameters
----------
spark_col : str
The query to apply to the geometry column.
df : pyspark.sql.DataFrame
The dataframe to query. If not provided, the internal dataframe will be used.
returns_geom : bool, default True
If True, the geometry column will be converted back to EWKB format.
is_aggr : bool, default False
If True, the query is an aggregation query.
Returns
-------
GeoSeries
A GeoSeries with the operation applied to the geometry column.
"""
df = self._internal.spark_frame if df is None else df
rename = SPARK_DEFAULT_SERIES_NAME
if keep_name and self.name:
rename = self.name
col_expr = spark_col.alias(rename)
exprs = [col_expr]
index_spark_columns = []
index_fields = []
if not is_aggr:
# We always select NATURAL_ORDER_COLUMN_NAME, to avoid having to regenerate it in the result
# We always select SPARK_DEFAULT_INDEX_NAME, to retain series index info
exprs.append(scol_for(df, SPARK_DEFAULT_INDEX_NAME))
exprs.append(scol_for(df, NATURAL_ORDER_COLUMN_NAME))
index_spark_columns = [scol_for(df, SPARK_DEFAULT_INDEX_NAME)]
index_fields = [self._internal.index_fields[0]]
sdf = df.select(
col_expr,
scol_for(df, SPARK_DEFAULT_INDEX_NAME),
scol_for(df, NATURAL_ORDER_COLUMN_NAME),
)
# else if is_aggr, we don't select the index columns
else:
sdf = df.select(*exprs)
internal = self._internal.copy(
spark_frame=sdf,
index_fields=index_fields,
index_spark_columns=index_spark_columns,
data_spark_columns=[scol_for(sdf, rename)],
data_fields=[self._internal.data_fields[0].copy(name=rename)],
column_label_names=[(rename,)],
)
ps_series = first_series(PandasOnSparkDataFrame(internal))
# Convert spark series default name to pandas series default name (None) if needed
series_name = None if rename == SPARK_DEFAULT_SERIES_NAME else rename
ps_series = ps_series.rename(series_name)
result = GeoSeries(ps_series) if returns_geom else ps_series
return result
# ============================================================================
# CONVERSION AND SERIALIZATION METHODS
# ============================================================================
[docs]
def to_geopandas(self) -> gpd.GeoSeries:
"""
Convert the GeoSeries to a geopandas GeoSeries.
Returns:
- geopandas.GeoSeries: A geopandas GeoSeries.
"""
from pyspark.pandas.utils import log_advice
log_advice(
"`to_geopandas` loads all data into the driver's memory. "
"It should only be used if the resulting geopandas GeoSeries is expected to be small."
)
return self._to_geopandas()
def _to_geopandas(self) -> gpd.GeoSeries:
"""
Same as `to_geopandas()`, without issuing the advice log for internal usage.
"""
pd_series = self._to_internal_pandas()
return gpd.GeoSeries(pd_series, crs=self.crs)
[docs]
def to_spark_pandas(self) -> pspd.Series:
return pspd.Series(pspd.DataFrame(self._psdf._internal))
# ============================================================================
# PROPERTIES AND ATTRIBUTES
# ============================================================================
@property
def geometry(self) -> "GeoSeries":
return self
@property
def sindex(self) -> SpatialIndex:
geometry_column = _get_series_col_name(self)
if geometry_column is None:
raise ValueError("No geometry column found in GeoSeries")
if self._sindex is None:
self._sindex = SpatialIndex(
self._internal.spark_frame, column_name=geometry_column
)
return self._sindex
@property
def has_sindex(self):
return self._sindex is not None
[docs]
def copy(self, deep=False):
"""Make a copy of this GeoSeries object.
Parameters
----------
deep : bool, default False
If True, a deep copy of the data is made. Otherwise, a shallow
copy is made.
Returns
-------
GeoSeries
A copy of this GeoSeries object.
Examples
--------
>>> from shapely.geometry import Point
>>> from sedona.spark.geopandas import GeoSeries
>>> gs = GeoSeries([Point(1, 1), Point(2, 2)])
>>> gs_copy = gs.copy()
>>> print(gs_copy)
0 POINT (1 1)
1 POINT (2 2)
dtype: geometry
"""
if deep:
return GeoSeries(
self._anchor.copy(), dtype=self.dtype, index=self._col_label
)
else:
return self
@property
def area(self) -> pspd.Series:
spark_col = stf.ST_Area(self.spark.column)
return self._query_geometry_column(
spark_col,
returns_geom=False,
)
@property
def geom_type(self) -> pspd.Series:
spark_col = stf.GeometryType(self.spark.column)
result = self._query_geometry_column(
spark_col,
returns_geom=False,
)
# Sedona returns the string in all caps unlike Geopandas
sgpd_to_gpg_name_map = {
"POINT": "Point",
"LINESTRING": "LineString",
"POLYGON": "Polygon",
"MULTIPOINT": "MultiPoint",
"MULTILINESTRING": "MultiLineString",
"MULTIPOLYGON": "MultiPolygon",
"GEOMETRYCOLLECTION": "GeometryCollection",
}
result = result.map(lambda x: sgpd_to_gpg_name_map.get(x, x))
return result
@property
def type(self):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error("type", "Returns numeric geometry type codes.")
)
@property
def length(self) -> pspd.Series:
spark_expr = (
F.when(
stf.GeometryType(self.spark.column).isin(
["LINESTRING", "MULTILINESTRING"]
),
stf.ST_Length(self.spark.column),
)
.when(
stf.GeometryType(self.spark.column).isin(["POLYGON", "MULTIPOLYGON"]),
stf.ST_Perimeter(self.spark.column),
)
.when(
stf.GeometryType(self.spark.column).isin(["POINT", "MULTIPOINT"]),
0.0,
)
.when(
stf.GeometryType(self.spark.column).isin(["GEOMETRYCOLLECTION"]),
stf.ST_Length(self.spark.column) + stf.ST_Perimeter(self.spark.column),
)
)
return self._query_geometry_column(
spark_expr,
returns_geom=False,
)
@property
def is_valid(self) -> pspd.Series:
spark_col = stf.ST_IsValid(self.spark.column)
result = self._query_geometry_column(
spark_col,
returns_geom=False,
)
return _to_bool(result)
[docs]
def is_valid_reason(self) -> pspd.Series:
spark_col = stf.ST_IsValidReason(self.spark.column)
return self._query_geometry_column(
spark_col,
returns_geom=False,
)
@property
def is_empty(self) -> pspd.Series:
spark_expr = stf.ST_IsEmpty(self.spark.column)
result = self._query_geometry_column(
spark_expr,
returns_geom=False,
)
return _to_bool(result)
[docs]
def count_coordinates(self):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error(
"count_coordinates",
"Counts the number of coordinate tuples in each geometry.",
)
)
[docs]
def count_geometries(self):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error(
"count_geometries",
"Counts the number of geometries in each multi-geometry or collection.",
)
)
[docs]
def count_interior_rings(self):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error(
"count_interior_rings",
"Counts the number of interior rings (holes) in each polygon.",
)
)
[docs]
def dwithin(self, other, distance, align=None):
if not isinstance(distance, (float, int)):
raise NotImplementedError(
"Array-like distance for dwithin not implemented yet."
)
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stp.ST_DWithin(F.col("L"), F.col("R"), F.lit(distance))
return self._row_wise_operation(
spark_expr,
other_series,
align=align,
returns_geom=False,
default_val=False,
)
[docs]
def difference(self, other, align=None) -> "GeoSeries":
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stf.ST_Difference(F.col("L"), F.col("R"))
return self._row_wise_operation(
spark_expr,
other_series,
align=align,
returns_geom=True,
)
@property
def is_simple(self) -> pspd.Series:
spark_expr = stf.ST_IsSimple(self.spark.column)
result = self._query_geometry_column(
spark_expr,
returns_geom=False,
)
return _to_bool(result)
@property
def is_ring(self):
spark_expr = stf.ST_IsRing(self.spark.column)
result = self._query_geometry_column(
spark_expr,
returns_geom=False,
)
return _to_bool(result)
@property
def is_ccw(self):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error(
"is_ccw",
"Tests if LinearRing geometries are oriented counter-clockwise.",
)
)
@property
def is_closed(self):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error(
"is_closed",
"Tests if LineString geometries are closed (start equals end point).",
)
)
@property
def has_z(self) -> pspd.Series:
spark_expr = stf.ST_HasZ(self.spark.column)
return self._query_geometry_column(
spark_expr,
returns_geom=False,
)
[docs]
def get_precision(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def get_geometry(self, index) -> "GeoSeries":
# Sedona errors on negative indexes, so we use a case statement to handle it ourselves
spark_expr = stf.ST_GeometryN(
F.col("L"),
F.when(
stf.ST_NumGeometries(F.col("L")) + F.col("R") < 0,
None,
)
.when(F.col("R") < 0, stf.ST_NumGeometries(F.col("L")) + F.col("R"))
.otherwise(F.col("R")),
)
other, _ = self._make_series_of_val(index)
# align = False either way
align = False
return self._row_wise_operation(
spark_expr,
other,
align=align,
returns_geom=True,
default_val=None,
)
@property
def boundary(self) -> "GeoSeries":
# Geopandas and shapely return NULL for GeometryCollections, so we handle it separately
# https://shapely.readthedocs.io/en/stable/reference/shapely.boundary.html
spark_expr = F.when(
stf.GeometryType(self.spark.column).isin(["GEOMETRYCOLLECTION"]),
None,
).otherwise(stf.ST_Boundary(self.spark.column))
return self._query_geometry_column(
spark_expr,
)
@property
def centroid(self) -> "GeoSeries":
spark_expr = stf.ST_Centroid(self.spark.column)
return self._query_geometry_column(
spark_expr,
returns_geom=True,
)
[docs]
def concave_hull(self, ratio=0.0, allow_holes=False):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
@property
def convex_hull(self):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error(
"convex_hull", "Computes the convex hull of each geometry."
)
)
[docs]
def delaunay_triangles(self, tolerance=0.0, only_edges=False):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def voronoi_polygons(self, tolerance=0.0, extend_to=None, only_edges=False):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
@property
def envelope(self) -> "GeoSeries":
spark_expr = stf.ST_Envelope(self.spark.column)
return self._query_geometry_column(
spark_expr,
returns_geom=True,
)
[docs]
def minimum_rotated_rectangle(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
@property
def exterior(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def offset_curve(self, distance, quad_segs=8, join_style="round", mitre_limit=5.0):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
@property
def interiors(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def remove_repeated_points(self, tolerance=0.0):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def set_precision(self, grid_size, mode="valid_output"):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def representative_point(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def minimum_bounding_circle(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def minimum_bounding_radius(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def minimum_clearance(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def normalize(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def make_valid(self, *, method="linework", keep_collapsed=True) -> "GeoSeries":
if method != "structure":
raise ValueError(
"Sedona only supports the 'structure' method for make_valid"
)
spark_expr = stf.ST_MakeValid(self.spark.column, keep_collapsed)
return self._query_geometry_column(
spark_expr,
returns_geom=True,
)
[docs]
def reverse(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def segmentize(self, max_segment_length):
other_series, extended = self._make_series_of_val(max_segment_length)
align = False if extended else align
spark_expr = stf.ST_Segmentize(F.col("L"), F.col("R"))
return self._row_wise_operation(
spark_expr,
other_series,
align=align,
returns_geom=True,
)
[docs]
def force_2d(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def force_3d(self, z=0):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def line_merge(self, directed=False):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
# ============================================================================
# GEOMETRIC OPERATIONS
# ============================================================================
@property
def unary_union(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def union_all(self, method="unary", grid_size=None) -> BaseGeometry:
if grid_size is not None:
raise NotImplementedError("Sedona does not support the grid_size argument")
if method != "unary":
import warnings
warnings.warn(
f"Sedona does not support manually specifying different union methods. Ignoring non-default method argument of {method}"
)
if len(self) == 0:
# While it's not explicitly defined in geopandas docs, this is what geopandas returns for empty GeoSeries
# If it ever changes for some reason, we'll catch that with the test
from shapely.geometry import GeometryCollection
return GeometryCollection()
spark_expr = sta.ST_Union_Aggr(self.spark.column)
tmp = self._query_geometry_column(spark_expr, returns_geom=False, is_aggr=True)
ps_series = tmp.take([0])
geom = ps_series.iloc[0]
return geom
[docs]
def crosses(self, other, align=None) -> pspd.Series:
# Sedona does not support GeometryCollection (errors), so we return NULL for now to avoid error
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = F.when(
(stf.GeometryType(F.col("L")) == "GEOMETRYCOLLECTION")
| (stf.GeometryType(F.col("R")) == "GEOMETRYCOLLECTION"),
None,
).otherwise(stp.ST_Crosses(F.col("L"), F.col("R")))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=False,
)
return _to_bool(result)
[docs]
def disjoint(self, other, align=None):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
[docs]
def intersects(
self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, None] = None
) -> pspd.Series:
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stp.ST_Intersects(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=False,
)
return _to_bool(result)
[docs]
def overlaps(self, other, align=None) -> pspd.Series:
# Note: We cannot efficiently match geopandas behavior because Sedona's ST_Overlaps returns True for equal geometries
# ST_Overlaps(`L`, `R`) AND ST_Equals(`L`, `R`) does not work because ST_Equals errors on invalid geometries
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stp.ST_Overlaps(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=False,
)
return _to_bool(result)
[docs]
def touches(self, other, align=None) -> pspd.Series:
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stp.ST_Touches(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=False,
)
return _to_bool(result)
[docs]
def within(self, other, align=None) -> pspd.Series:
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stp.ST_Within(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=False,
)
return _to_bool(result)
[docs]
def covers(self, other, align=None) -> pspd.Series:
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stp.ST_Covers(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=False,
)
return _to_bool(result)
[docs]
def covered_by(self, other, align=None) -> pspd.Series:
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stp.ST_CoveredBy(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=False,
)
return _to_bool(result)
[docs]
def distance(self, other, align=None) -> pspd.Series:
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stf.ST_Distance(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
default_val=None,
)
return result
[docs]
def intersection(
self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, None] = None
) -> "GeoSeries":
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stf.ST_Intersection(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other_series,
align,
returns_geom=True,
default_val=None,
)
return result
[docs]
def snap(self, other, tolerance, align=None) -> "GeoSeries":
if not isinstance(tolerance, (float, int)):
raise NotImplementedError(
"Array-like values for tolerance are not supported yet."
)
# Both sgpd and gpd implementations simply call the snap functions
# in JTS and GEOs, respectively. The results often differ slightly, but these
# must be differences inside of the engines themselves.
other_series, extended = self._make_series_of_val(other)
align = False if extended else align
spark_expr = stf.ST_Snap(F.col("L"), F.col("R"), tolerance)
result = self._row_wise_operation(
spark_expr,
other_series,
align,
returns_geom=True,
)
return result
def _row_wise_operation(
self,
spark_col: PySparkColumn,
other: pspd.Series,
align: Union[bool, None],
returns_geom: bool = False,
default_val: Any = None,
keep_name: bool = False,
):
"""
Helper function to perform a row-wise operation on two GeoSeries.
The self column and other column are aliased to `L` and `R`, respectively.
align : bool or None (default None)
If True, automatically aligns GeoSeries based on their indices. None defaults to True.
If False, the order of elements is preserved.
Note: align should also be set to False when 'other' a geoseries created from a single object
(e.g. GeoSeries([Point(0, 0) * len(self)])), so that we align based on natural ordering in case
the index is not the default range index from 0.
Alternatively, we could create 'other' using the same index as self,
but that would require index=self.index.to_pandas() which is less scalable.
default_val : Any (default None)
The value to use if either L or R is null. If None, nulls are not handled.
"""
from pyspark.sql.functions import col
# Note: this is specifically False. None is valid since it defaults to True similar to geopandas
index_col = (
NATURAL_ORDER_COLUMN_NAME if align is False else SPARK_DEFAULT_INDEX_NAME
)
# This code assumes there is only one index (SPARK_DEFAULT_INDEX_NAME)
# and would need to be updated if Sedona later supports multi-index
sdf = self._internal.spark_frame
other_sdf = other._internal.spark_frame
if index_col == NATURAL_ORDER_COLUMN_NAME:
# NATURAL_ORDER_COLUMN_NAME is not deterministic or sequential, so we instead replace it with a
# new column of row_numbers to perform the alignment. This mimics the following code.
# sdf.withColumn(index_col, F.row_number().over(Window.orderBy(NATURAL_ORDER_COLUMN_NAME)))
sdf = sdf.drop(index_col)
other_sdf = other_sdf.drop(index_col)
sdf = self._internal.attach_distributed_sequence_column(sdf, index_col)
other_sdf = other._internal.attach_distributed_sequence_column(
other_sdf, index_col
)
sdf = sdf.select(
self.spark.column.alias("L"),
# For the left side:
# - We always select NATURAL_ORDER_COLUMN_NAME, to avoid having to regenerate it in the result
# - We always select SPARK_DEFAULT_INDEX_NAME, to retain series index info
col(NATURAL_ORDER_COLUMN_NAME),
col(SPARK_DEFAULT_INDEX_NAME),
)
other_sdf = other_sdf.select(
other.spark.column.alias("R"),
# for the right side, we only need the column that we are joining on
col(index_col),
)
joined_df = sdf.join(other_sdf, on=index_col, how="outer")
if default_val is not None:
# ps.Series.fillna() doesn't always work for the output for some reason
# so we manually handle the nulls here.
spark_col = F.when(
F.col("L").isNull() | F.col("R").isNull(),
default_val,
).otherwise(spark_col)
return self._query_geometry_column(
spark_col,
joined_df,
returns_geom=returns_geom,
keep_name=keep_name,
)
[docs]
def intersection_all(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
# ============================================================================
# SPATIAL PREDICATES
# ============================================================================
[docs]
def contains(self, other, align=None) -> pspd.Series:
other, extended = self._make_series_of_val(other)
align = False if extended else align
spark_col = stp.ST_Contains(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_col,
other,
align,
returns_geom=False,
default_val=False,
)
return _to_bool(result)
[docs]
def contains_properly(self, other, align=None):
# Implementation of the abstract method
raise NotImplementedError(
_not_implemented_error(
"contains_properly",
"Tests if geometries properly contain other geometries (no boundary contact).",
)
)
[docs]
def buffer(
self,
distance,
resolution=16,
cap_style="round",
join_style="round",
mitre_limit=5.0,
single_sided=False,
**kwargs,
) -> "GeoSeries":
if single_sided:
# Reverse the following logic in common/src/main/java/org/apache/sedona/common/Functions.java buffer() to avoid negating the distance
# if (bufferParameters.isSingleSided()
# && (params.toLowerCase().contains("left") && radius < 0
# || params.toLowerCase().contains("right") && radius > 0)) {
# radius = -radius;
# }
side = "left" if distance >= 0 else "right"
else:
side = "both"
assert side in [
"left",
"right",
"both",
], "single-sided must be one of 'left', 'right', or 'both', True, or False"
parameters = F.lit(
f"quad_segs={resolution} endcap={cap_style} join={join_style} mitre_limit={mitre_limit} side={side}"
)
spark_col = stf.ST_Buffer(
self.spark.column, distance, useSpheroid=False, parameters=parameters
)
return self._query_geometry_column(
spark_col,
returns_geom=True,
)
[docs]
def simplify(self, tolerance=None, preserve_topology=True) -> "GeoSeries":
spark_expr = (
stf.ST_SimplifyPreserveTopology(self.spark.column, tolerance)
if preserve_topology
else stf.ST_Simplify(self.spark.column, tolerance)
)
return self._query_geometry_column(spark_expr)
[docs]
def plot(self, *args, **kwargs):
"""
Plot a GeoSeries.
Generate a plot of a GeoSeries geometry with matplotlib.
Note: This method is not scalable and requires collecting all data to the driver.
Parameters
----------
s : Series
The GeoSeries to be plotted. Currently Polygon,
MultiPolygon, LineString, MultiLineString, Point and MultiPoint
geometries can be plotted.
cmap : str (default None)
The name of a colormap recognized by matplotlib. Any
colormap will work, but categorical colormaps are
generally recommended. Examples of useful discrete
colormaps include:
tab10, tab20, Accent, Dark2, Paired, Pastel1, Set1, Set2
color : str, np.array, pd.Series, List (default None)
If specified, all objects will be colored uniformly.
ax : matplotlib.pyplot.Artist (default None)
axes on which to draw the plot
figsize : pair of floats (default None)
Size of the resulting matplotlib.figure.Figure. If the argument
ax is given explicitly, figsize is ignored.
aspect : 'auto', 'equal', None or float (default 'auto')
Set aspect of axis. If 'auto', the default aspect for map plots is 'equal'; if
however data are not projected (coordinates are long/lat), the aspect is by
default set to 1/cos(s_y * pi/180) with s_y the y coordinate of the middle of
the GeoSeries (the mean of the y range of bounding box) so that a long/lat
square appears square in the middle of the plot. This implies an
Equirectangular projection. If None, the aspect of `ax` won't be changed. It can
also be set manually (float) as the ratio of y-unit to x-unit.
autolim : bool (default True)
Update axes data limits to contain the new geometries.
**style_kwds : dict
Color options to be passed on to the actual plot function, such
as ``edgecolor``, ``facecolor``, ``linewidth``, ``markersize``,
``alpha``.
Returns
-------
ax : matplotlib axes instance
"""
return self.to_geopandas().plot(*args, **kwargs)
@property
def geometry(self) -> "GeoSeries":
return self
# GeoSeries-only (not in GeoDataFrame)
@property
def x(self) -> pspd.Series:
"""Return the x location of point geometries in a GeoSeries
Returns
-------
pandas.Series
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Point
>>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)])
>>> s.x
0 1.0
1 2.0
2 3.0
dtype: float64
See Also
--------
GeoSeries.y
GeoSeries.z
"""
spark_col = stf.ST_X(self.spark.column)
return self._query_geometry_column(
spark_col,
returns_geom=False,
)
# GeoSeries-only (not in GeoDataFrame)
@property
def y(self) -> pspd.Series:
"""Return the y location of point geometries in a GeoSeries
Returns
-------
pandas.Series
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Point
>>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)])
>>> s.y
0 1.0
1 2.0
2 3.0
dtype: float64
See Also
--------
GeoSeries.x
GeoSeries.z
GeoSeries.m
"""
spark_col = stf.ST_Y(self.spark.column)
return self._query_geometry_column(
spark_col,
returns_geom=False,
)
# GeoSeries-only (not in GeoDataFrame)
@property
def z(self) -> pspd.Series:
"""Return the z location of point geometries in a GeoSeries
Returns
-------
pandas.Series
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Point
>>> s = GeoSeries([Point(1, 1, 1), Point(2, 2, 2), Point(3, 3, 3)])
>>> s.z
0 1.0
1 2.0
2 3.0
dtype: float64
See Also
--------
GeoSeries.x
GeoSeries.y
GeoSeries.m
"""
spark_col = stf.ST_Z(self.spark.column)
return self._query_geometry_column(
spark_col,
returns_geom=False,
)
# GeoSeries-only (not in GeoDataFrame)
@property
def m(self) -> pspd.Series:
raise NotImplementedError("GeoSeries.m() is not implemented yet.")
# ============================================================================
# CONSTRUCTION METHODS
# ============================================================================
# GeoSeries-only (not in GeoDataFrame)
[docs]
@classmethod
def from_file(
cls, filename: str, format: Union[str, None] = None, **kwargs
) -> "GeoSeries":
"""Alternate constructor to create a ``GeoDataFrame`` from a file.
Parameters
----------
filename : str
File path or file handle to read from. If the path is a directory,
Sedona will read all files in that directory.
format : str, optional
The format of the file to read, by default None. If None, Sedona
infers the format from the file extension. Note that format
inference is not supported for directories. Available formats are
"shapefile", "geojson", "geopackage", and "geoparquet".
table_name : str, optional
The name of the table to read from a GeoPackage file, by default
None. This is required if ``format`` is "geopackage".
See Also
--------
GeoDataFrame.to_file : Write a ``GeoDataFrame`` to a file.
"""
df = sgpd.io.read_file(filename, format, **kwargs)
return GeoSeries(df.geometry, crs=df.crs)
# GeoSeries-only (not in GeoDataFrame)
[docs]
@classmethod
def from_wkb(
cls,
data,
index=None,
crs: Union[Any, None] = None,
on_invalid="raise",
**kwargs,
) -> "GeoSeries":
r"""
Alternate constructor to create a ``GeoSeries``
from a list or array of WKB objects
Parameters
----------
data : array-like or Series
Series, list or array of WKB objects
index : array-like or Index
The index for the GeoSeries.
crs : value, optional
Coordinate Reference System of the geometry objects. Can be anything
accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
on_invalid: {"raise", "warn", "ignore"}, default "raise"
- raise: an exception will be raised if a WKB input geometry is invalid.
- warn: a warning will be raised and invalid WKB geometries will be returned
as None.
- ignore: invalid WKB geometries will be returned as None without a warning.
- fix: an effort is made to fix invalid input geometries (e.g. close
unclosed rings). If this is not possible, they are returned as ``None``
without a warning. Requires GEOS >= 3.11 and shapely >= 2.1.
kwargs
Additional arguments passed to the Series constructor,
e.g. ``name``.
Returns
-------
GeoSeries
See Also
--------
GeoSeries.from_wkt
Examples
--------
>>> wkbs = [
... (
... b"\x01\x01\x00\x00\x00\x00\x00\x00\x00"
... b"\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?"
... ),
... (
... b"\x01\x01\x00\x00\x00\x00\x00\x00\x00"
... b"\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00@"
... ),
... (
... b"\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00"
... b"\x00\x08@\x00\x00\x00\x00\x00\x00\x08@"
... ),
... ]
>>> s = GeoSeries.from_wkb(wkbs)
>>> s
0 POINT (1 1)
1 POINT (2 2)
2 POINT (3 3)
dtype: geometry
"""
if on_invalid != "raise":
raise NotImplementedError(
"GeoSeries.from_wkb(): only on_invalid='raise' is implemented"
)
from pyspark.sql.types import StructType, StructField, BinaryType
schema = StructType([StructField("data", BinaryType(), True)])
return cls._create_from_select(
stc.ST_GeomFromWKB(F.col("data")),
data,
schema,
index,
crs,
**kwargs,
)
# GeoSeries-only (not in GeoDataFrame)
[docs]
@classmethod
def from_wkt(
cls,
data,
index=None,
crs: Union[Any, None] = None,
on_invalid="raise",
**kwargs,
) -> "GeoSeries":
"""
Alternate constructor to create a ``GeoSeries``
from a list or array of WKT objects
Parameters
----------
data : array-like, Series
Series, list, or array of WKT objects
index : array-like or Index
The index for the GeoSeries.
crs : value, optional
Coordinate Reference System of the geometry objects. Can be anything
accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
on_invalid : {"raise", "warn", "ignore"}, default "raise"
- raise: an exception will be raised if a WKT input geometry is invalid.
- warn: a warning will be raised and invalid WKT geometries will be
returned as ``None``.
- ignore: invalid WKT geometries will be returned as ``None`` without a
warning.
- fix: an effort is made to fix invalid input geometries (e.g. close
unclosed rings). If this is not possible, they are returned as ``None``
without a warning. Requires GEOS >= 3.11 and shapely >= 2.1.
kwargs
Additional arguments passed to the Series constructor,
e.g. ``name``.
Returns
-------
GeoSeries
See Also
--------
GeoSeries.from_wkb
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> wkts = [
... 'POINT (1 1)',
... 'POINT (2 2)',
... 'POINT (3 3)',
... ]
>>> s = GeoSeries.from_wkt(wkts)
>>> s
0 POINT (1 1)
1 POINT (2 2)
2 POINT (3 3)
dtype: geometry
"""
if on_invalid != "raise":
raise NotImplementedError(
"GeoSeries.from_wkt(): only on_invalid='raise' is implemented"
)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("data", StringType(), True)])
return cls._create_from_select(
stc.ST_GeomFromText(F.col("data")),
data,
schema,
index,
crs,
**kwargs,
)
# GeoSeries-only (not in GeoDataFrame)
[docs]
@classmethod
def from_xy(cls, x, y, z=None, index=None, crs=None, **kwargs) -> "GeoSeries":
"""
Alternate constructor to create a :class:`~geopandas.GeoSeries` of Point
geometries from lists or arrays of x, y(, z) coordinates
In case of geographic coordinates, it is assumed that longitude is captured
by ``x`` coordinates and latitude by ``y``.
Parameters
----------
x, y, z : iterable
index : array-like or Index, optional
The index for the GeoSeries. If not given and all coordinate inputs
are Series with an equal index, that index is used.
crs : value, optional
Coordinate Reference System of the geometry objects. Can be anything
accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
**kwargs
Additional arguments passed to the Series constructor,
e.g. ``name``.
Returns
-------
GeoSeries
See Also
--------
GeoSeries.from_wkt
points_from_xy
Examples
--------
>>> x = [2.5, 5, -3.0]
>>> y = [0.5, 1, 1.5]
>>> s = GeoSeries.from_xy(x, y, crs="EPSG:4326")
>>> s
0 POINT (2.5 0.5)
1 POINT (5 1)
2 POINT (-3 1.5)
dtype: geometry
"""
from pyspark.sql.types import StructType, StructField, DoubleType
schema = StructType(
[StructField("x", DoubleType(), True), StructField("y", DoubleType(), True)]
)
# Spark doesn't automatically cast ints to floats for us
x = [float(num) for num in x]
y = [float(num) for num in y]
z = [float(num) for num in z] if z else None
if z:
data = list(zip(x, y, z))
select = stc.ST_PointZ(F.col("x"), F.col("y"), F.col("z"))
schema.add(StructField("z", DoubleType(), True))
else:
data = list(zip(x, y))
select = stc.ST_Point(F.col("x"), F.col("y"))
geoseries = cls._create_from_select(
select,
data,
schema,
index,
crs,
**kwargs,
)
if crs:
from pyproj import CRS
geoseries.crs = CRS.from_user_input(crs).to_epsg()
return geoseries
[docs]
@classmethod
def from_shapely(
cls, data, index=None, crs: Union[Any, None] = None, **kwargs
) -> "GeoSeries":
raise NotImplementedError(
_not_implemented_error(
"from_shapely", "Creates GeoSeries from Shapely geometry objects."
)
)
[docs]
@classmethod
def from_arrow(cls, arr, **kwargs) -> "GeoSeries":
"""
Construct a GeoSeries from a Arrow array object with a GeoArrow
extension type.
See https://geoarrow.org/ for details on the GeoArrow specification.
This functions accepts any Arrow array object implementing
the `Arrow PyCapsule Protocol`_ (i.e. having an ``__arrow_c_array__``
method).
.. _Arrow PyCapsule Protocol: https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
Note: Requires geopandas versions >= 1.0.0 to use with Sedona.
Parameters
----------
arr : pyarrow.Array, Arrow array
Any array object implementing the Arrow PyCapsule Protocol
(i.e. has an ``__arrow_c_array__`` or ``__arrow_c_stream__``
method). The type of the array should be one of the
geoarrow geometry types.
**kwargs
Other parameters passed to the GeoSeries constructor.
Returns
-------
GeoSeries
See Also
--------
GeoSeries.to_arrow
GeoDataFrame.from_arrow
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> import geoarrow.pyarrow as ga
>>> array = ga.as_geoarrow([None, "POLYGON ((0 0, 1 1, 0 1, 0 0))", "LINESTRING (0 0, -1 1, 0 -1)"])
>>> geoseries = GeoSeries.from_arrow(array)
>>> geoseries
0 None
1 POLYGON ((0 0, 1 1, 0 1, 0 0))
2 LINESTRING (0 0, -1 1, 0 -1)
dtype: geometry
"""
gpd_series = gpd.GeoSeries.from_arrow(arr, **kwargs)
return GeoSeries(gpd_series)
@classmethod
def _create_from_select(
cls, spark_col: PySparkColumn, data, schema, index, crs, **kwargs
) -> "GeoSeries":
from pyspark.pandas.utils import default_session
from pyspark.pandas.internal import InternalField
import numpy as np
if isinstance(data, list) and not isinstance(data[0], (tuple, list)):
data = [(obj,) for obj in data]
name = kwargs.get("name", SPARK_DEFAULT_SERIES_NAME)
if isinstance(data, pspd.Series):
spark_df = data._internal.spark_frame
assert len(schema) == 1
spark_df = spark_df.withColumnRenamed(
_get_series_col_name(data), schema[0].name
)
else:
spark_df = default_session().createDataFrame(data, schema=schema)
spark_df = spark_df.select(spark_col.alias(name))
internal = InternalFrame(
spark_frame=spark_df,
index_spark_columns=None,
column_labels=[(name,)],
data_spark_columns=[scol_for(spark_df, name)],
data_fields=[InternalField(np.dtype("object"), spark_df.schema[name])],
column_label_names=[(name,)],
)
ps_series = first_series(PandasOnSparkDataFrame(internal))
name = None if name == SPARK_DEFAULT_SERIES_NAME else name
ps_series.rename(name, inplace=True)
return GeoSeries(ps_series, index, crs=crs)
# ============================================================================
# DATA ACCESS AND MANIPULATION
# ============================================================================
# GeoSeries-only (not in GeoDataFrame)
[docs]
def isna(self) -> pspd.Series:
"""
Detect missing values.
Returns
-------
A boolean Series of the same size as the GeoSeries,
True where a value is NA.
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Polygon
>>> s = GeoSeries(
... [Polygon([(0, 0), (1, 1), (0, 1)]), None, Polygon([])]
... )
>>> s
0 POLYGON ((0 0, 1 1, 0 1, 0 0))
1 None
2 POLYGON EMPTY
dtype: geometry
>>> s.isna()
0 False
1 True
2 False
dtype: bool
See Also
--------
GeoSeries.notna : inverse of isna
GeoSeries.is_empty : detect empty geometries
"""
spark_expr = F.isnull(self.spark.column)
result = self._query_geometry_column(
spark_expr,
returns_geom=False,
)
return _to_bool(result)
# GeoSeries-only (not in GeoDataFrame)
[docs]
def isnull(self) -> pspd.Series:
"""Alias for `isna` method. See `isna` for more detail."""
return self.isna()
# GeoSeries-only (not in GeoDataFrame)
[docs]
def notna(self) -> pspd.Series:
"""
Detect non-missing values.
Returns
-------
A boolean pandas Series of the same size as the GeoSeries,
False where a value is NA.
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Polygon
>>> s = GeoSeries(
... [Polygon([(0, 0), (1, 1), (0, 1)]), None, Polygon([])]
... )
>>> s
0 POLYGON ((0 0, 1 1, 0 1, 0 0))
1 None
2 POLYGON EMPTY
dtype: geometry
>>> s.notna()
0 True
1 False
2 True
dtype: bool
See Also
--------
GeoSeries.isna : inverse of notna
GeoSeries.is_empty : detect empty geometries
"""
# After Sedona's minimum spark version is 3.5.0, we can use F.isnotnull(self.spark.column) instead
spark_expr = ~F.isnull(self.spark.column)
result = self._query_geometry_column(
spark_expr,
returns_geom=False,
)
return _to_bool(result)
# GeoSeries-only (not in GeoDataFrame)
[docs]
def notnull(self) -> pspd.Series:
"""Alias for `notna` method. See `notna` for more detail."""
return self.notna()
# GeoSeries-only (not in GeoDataFrame)
[docs]
def fillna(
self, value=None, inplace: bool = False, limit=None, **kwargs
) -> Union["GeoSeries", None]:
"""
Fill NA values with geometry (or geometries).
Parameters
----------
value : shapely geometry or GeoSeries, default None
If None is passed, NA values will be filled with GEOMETRYCOLLECTION EMPTY.
If a shapely geometry object is passed, it will be
used to fill all missing values. If a ``GeoSeries``
is passed, missing values will be filled based on the corresponding index
locations. If pd.NA or np.nan are passed, values will be filled with
``None`` (not GEOMETRYCOLLECTION EMPTY).
limit : int, default None
This is the maximum number of entries along the entire axis
where NaNs will be filled. Must be greater than 0 if not None.
Returns
-------
GeoSeries
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Polygon
>>> s = GeoSeries(
... [
... Polygon([(0, 0), (1, 1), (0, 1)]),
... None,
... Polygon([(0, 0), (-1, 1), (0, -1)]),
... ]
... )
>>> s
0 POLYGON ((0 0, 1 1, 0 1, 0 0))
1 None
2 POLYGON ((0 0, -1 1, 0 -1, 0 0))
dtype: geometry
Filled with an empty polygon.
>>> s.fillna()
0 POLYGON ((0 0, 1 1, 0 1, 0 0))
1 GEOMETRYCOLLECTION EMPTY
2 POLYGON ((0 0, -1 1, 0 -1, 0 0))
dtype: geometry
Filled with a specific polygon.
>>> s.fillna(Polygon([(0, 1), (2, 1), (1, 2)]))
0 POLYGON ((0 0, 1 1, 0 1, 0 0))
1 POLYGON ((0 1, 2 1, 1 2, 0 1))
2 POLYGON ((0 0, -1 1, 0 -1, 0 0))
dtype: geometry
Filled with another GeoSeries.
>>> from shapely.geometry import Point
>>> s_fill = GeoSeries(
... [
... Point(0, 0),
... Point(1, 1),
... Point(2, 2),
... ]
... )
>>> s.fillna(s_fill)
0 POLYGON ((0 0, 1 1, 0 1, 0 0))
1 POINT (1 1)
2 POLYGON ((0 0, -1 1, 0 -1, 0 0))
dtype: geometry
See Also
--------
GeoSeries.isna : detect missing values
"""
from shapely.geometry.base import BaseGeometry
# TODO: Implement limit https://github.com/apache/sedona/issues/2068
if limit:
raise NotImplementedError(
"GeoSeries.fillna() with limit is not implemented yet."
)
align = True
if pd.isna(value) == True or isinstance(value, BaseGeometry):
if (
value is not None and pd.isna(value) == True
): # ie. value is np.nan or pd.NA:
value = None
else:
if value is None:
from shapely.geometry import GeometryCollection
value = GeometryCollection()
other, extended = self._make_series_of_val(value)
align = False if extended else align
elif isinstance(value, (GeoSeries, gpd.GeoSeries)):
if not isinstance(value, GeoSeries):
value = GeoSeries(value)
# Replace all None's with empty geometries (this is a recursive call)
other = value.fillna(None)
else:
raise ValueError(f"Invalid value type: {type(value)}")
# Coalesce: If the value in L is null, use the corresponding value in R for that row
spark_expr = F.coalesce(F.col("L"), F.col("R"))
result = self._row_wise_operation(
spark_expr,
other,
align=align,
returns_geom=True,
default_val=None,
keep_name=True,
)
if inplace:
self._update_inplace(result)
return None
return result
[docs]
def explode(self, ignore_index=False, index_parts=False) -> "GeoSeries":
raise NotImplementedError(
_not_implemented_error(
"explode",
"Explodes multi-part geometries into separate single-part geometries.",
)
)
[docs]
def to_crs(
self, crs: Union[Any, None] = None, epsg: Union[int, None] = None
) -> "GeoSeries":
"""Returns a ``GeoSeries`` with all geometries transformed to a new
coordinate reference system.
Transform all geometries in a GeoSeries to a different coordinate
reference system. The ``crs`` attribute on the current GeoSeries must
be set. Either ``crs`` or ``epsg`` may be specified for output.
This method will transform all points in all objects. It has no notion
of projecting entire geometries. All segments joining points are
assumed to be lines in the current projection, not geodesics. Objects
crossing the dateline (or other projection boundary) will have
undesirable behavior.
Parameters
----------
crs : pyproj.CRS, optional if `epsg` is specified
The value can be anything accepted
by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
epsg : int, optional if `crs` is specified
EPSG code specifying output projection.
Returns
-------
GeoSeries
Examples
--------
>>> from shapely.geometry import Point
>>> from sedona.spark.geopandas import GeoSeries
>>> geoseries = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)], crs=4326)
>>> geoseries.crs
<Geographic 2D CRS: EPSG:4326>
Name: WGS 84
Axis Info [ellipsoidal]:
- Lat[north]: Geodetic latitude (degree)
- Lon[east]: Geodetic longitude (degree)
Area of Use:
- name: World
- bounds: (-180.0, -90.0, 180.0, 90.0)
Datum: World Geodetic System 1984
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich
>>> geoseries = geoseries.to_crs(3857)
>>> print(geoseries)
0 POINT (111319.491 111325.143)
1 POINT (222638.982 222684.209)
2 POINT (333958.472 334111.171)
dtype: geometry
>>> geoseries.crs
<Projected CRS: EPSG:3857>
Name: WGS 84 / Pseudo-Mercator
Axis Info [cartesian]:
- X[east]: Easting (metre)
- Y[north]: Northing (metre)
Area of Use:
- name: World - 85°S to 85°N
- bounds: (-180.0, -85.06, 180.0, 85.06)
Coordinate Operation:
- name: Popular Visualisation Pseudo-Mercator
- method: Popular Visualisation Pseudo Mercator
Datum: World Geodetic System 1984
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich
"""
from pyproj import CRS
if crs is not None:
crs = CRS.from_user_input(crs)
elif epsg is not None:
crs = CRS.from_epsg(epsg)
else:
raise ValueError("Must pass either crs or epsg.")
spark_expr = stf.ST_Transform(
self.spark.column,
F.lit(f"EPSG:{crs.to_epsg()}"),
)
return self._query_geometry_column(
spark_expr,
keep_name=True,
)
@property
def bounds(self) -> pspd.DataFrame:
selects = [
stf.ST_XMin(self.spark.column).alias("minx"),
stf.ST_YMin(self.spark.column).alias("miny"),
stf.ST_XMax(self.spark.column).alias("maxx"),
stf.ST_YMax(self.spark.column).alias("maxy"),
]
df = self._internal.spark_frame
sdf = df.select(*selects)
internal = InternalFrame(
spark_frame=sdf,
index_spark_columns=None,
column_labels=[("minx",), ("miny",), ("maxx",), ("maxy",)],
data_spark_columns=[
scol_for(sdf, "minx"),
scol_for(sdf, "miny"),
scol_for(sdf, "maxx"),
scol_for(sdf, "maxy"),
],
column_label_names=None,
)
return pspd.DataFrame(internal)
@property
def total_bounds(self):
import numpy as np
import warnings
from pyspark.sql import functions as F
if len(self) == 0:
# numpy 'min' cannot handle empty arrays
# TODO with numpy >= 1.15, the 'initial' argument can be used
return np.array([np.nan, np.nan, np.nan, np.nan])
ps_df = self.bounds
with warnings.catch_warnings():
# if all rows are empty geometry / none, nan is expected
warnings.filterwarnings(
"ignore", r"All-NaN slice encountered", RuntimeWarning
)
total_bounds_df = ps_df.agg(
{
"minx": ["min"],
"miny": ["min"],
"maxx": ["max"],
"maxy": ["max"],
}
)
return np.array(
(
np.nanmin(total_bounds_df["minx"]["min"]), # minx
np.nanmin(total_bounds_df["miny"]["min"]), # miny
np.nanmax(total_bounds_df["maxx"]["max"]), # maxx
np.nanmax(total_bounds_df["maxy"]["max"]), # maxy
)
)
# GeoSeries-only (not in GeoDataFrame)
[docs]
def estimate_utm_crs(self, datum_name: str = "WGS 84") -> "CRS":
"""Returns the estimated UTM CRS based on the bounds of the dataset.
Parameters
----------
datum_name : str, optional
The name of the datum to use in the query. Default is WGS 84.
Returns
-------
pyproj.CRS
Examples
--------
>>> import geodatasets # requires: pip install geodatasets
>>> import geopandas as gpd
>>> df = gpd.read_file(
... geodatasets.get_path("geoda.chicago_commpop")
... )
>>> df.geometry.values.estimate_utm_crs() # doctest: +SKIP
<Derived Projected CRS: EPSG:32616>
Name: WGS 84 / UTM zone 16N
Axis Info [cartesian]:
- E[east]: Easting (metre)
- N[north]: Northing (metre)
Area of Use:
- name: Between 90°W and 84°W, northern hemisphere between equator and 84°N,...
- bounds: (-90.0, 0.0, -84.0, 84.0)
Coordinate Operation:
- name: UTM zone 16N
- method: Transverse Mercator
Datum: World Geodetic System 1984 ensemble
- Ellipsoid: WGS 84
- Prime Meridian: Greenwich
"""
import numpy as np
from pyproj import CRS
from pyproj.aoi import AreaOfInterest
from pyproj.database import query_utm_crs_info
# This implementation replicates the implementation in geopandas's implementation exactly.
# https://github.com/geopandas/geopandas/blob/main/geopandas/array.py
# The only difference is that we use Sedona's total_bounds property which is more efficient and scalable
# than the geopandas implementation. The rest of the implementation always executes on 4 points (minx, miny, maxx, maxy),
# so the numpy and pyproj implementations are reasonable.
if not self.crs:
raise RuntimeError("crs must be set to estimate UTM CRS.")
minx, miny, maxx, maxy = self.total_bounds
if self.crs.is_geographic:
x_center = np.mean([minx, maxx])
y_center = np.mean([miny, maxy])
# ensure using geographic coordinates
else:
from pyproj import Transformer
from functools import lru_cache
TransformerFromCRS = lru_cache(Transformer.from_crs)
transformer = TransformerFromCRS(self.crs, "EPSG:4326", always_xy=True)
minx, miny, maxx, maxy = transformer.transform_bounds(
minx, miny, maxx, maxy
)
y_center = np.mean([miny, maxy])
# crossed the antimeridian
if minx > maxx:
# shift maxx from [-180,180] to [0,360]
# so both numbers are positive for center calculation
# Example: -175 to 185
maxx += 360
x_center = np.mean([minx, maxx])
# shift back to [-180,180]
x_center = ((x_center + 180) % 360) - 180
else:
x_center = np.mean([minx, maxx])
utm_crs_list = query_utm_crs_info(
datum_name=datum_name,
area_of_interest=AreaOfInterest(
west_lon_degree=x_center,
south_lat_degree=y_center,
east_lon_degree=x_center,
north_lat_degree=y_center,
),
)
try:
return CRS.from_epsg(utm_crs_list[0].code)
except IndexError:
raise RuntimeError("Unable to determine UTM CRS")
[docs]
def to_json(
self,
show_bbox: bool = True,
drop_id: bool = False,
to_wgs84: bool = False,
**kwargs,
) -> str:
"""
Returns a GeoJSON string representation of the GeoSeries.
Parameters
----------
show_bbox : bool, optional, default: True
Include bbox (bounds) in the geojson
drop_id : bool, default: False
Whether to retain the index of the GeoSeries as the id property
in the generated GeoJSON. Default is False, but may want True
if the index is just arbitrary row numbers.
to_wgs84: bool, optional, default: False
If the CRS is set on the active geometry column it is exported as
WGS84 (EPSG:4326) to meet the `2016 GeoJSON specification
<https://tools.ietf.org/html/rfc7946>`_.
Set to True to force re-projection and set to False to ignore CRS. False by
default.
*kwargs* that will be passed to json.dumps().
Note: Unlike geopandas, Sedona's implementation will replace 'LinearRing'
with 'LineString' in the GeoJSON output.
Returns
-------
JSON string
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Point
>>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)])
>>> s
0 POINT (1 1)
1 POINT (2 2)
2 POINT (3 3)
dtype: geometry
>>> s.to_json()
'{"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", "pr\
operties": {}, "geometry": {"type": "Point", "coordinates": [1.0, 1.0]}, "bbox": [1.0,\
1.0, 1.0, 1.0]}, {"id": "1", "type": "Feature", "properties": {}, "geometry": {"type"\
: "Point", "coordinates": [2.0, 2.0]}, "bbox": [2.0, 2.0, 2.0, 2.0]}, {"id": "2", "typ\
e": "Feature", "properties": {}, "geometry": {"type": "Point", "coordinates": [3.0, 3.\
0]}, "bbox": [3.0, 3.0, 3.0, 3.0]}], "bbox": [1.0, 1.0, 3.0, 3.0]}'
See Also
--------
GeoSeries.to_file : write GeoSeries to file
"""
return self.to_geoframe(name="geometry").to_json(
na="null", show_bbox=show_bbox, drop_id=drop_id, to_wgs84=to_wgs84, **kwargs
)
[docs]
def to_wkb(self, hex: bool = False, **kwargs) -> pspd.Series:
"""
Convert GeoSeries geometries to WKB
Parameters
----------
hex : bool
If true, export the WKB as a hexadecimal string.
The default is to return a binary bytes object.
kwargs
Additional keyword args will be passed to
:func:`shapely.to_wkb`.
Returns
-------
Series
WKB representations of the geometries
See also
--------
GeoSeries.to_wkt
Examples
--------
>>> from shapely.geometry import Point, Polygon
>>> s = GeoSeries(
... [
... Point(0, 0),
... Polygon(),
... Polygon([(0, 0), (1, 1), (1, 0)]),
... None,
... ]
... )
>>> s.to_wkb()
0 b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00...
1 b'\x01\x03\x00\x00\x00\x00\x00\x00\x00'
2 b'\x01\x03\x00\x00\x00\x01\x00\x00\x00\x04\x00...
3 None
dtype: object
>>> s.to_wkb(hex=True)
0 010100000000000000000000000000000000000000
1 010300000000000000
2 0103000000010000000400000000000000000000000000...
3 None
dtype: object
"""
spark_expr = stf.ST_AsBinary(self.spark.column)
if hex:
spark_expr = F.hex(spark_expr)
return self._query_geometry_column(
spark_expr,
returns_geom=False,
)
[docs]
def to_wkt(self, **kwargs) -> pspd.Series:
"""
Convert GeoSeries geometries to WKT
Note: Using shapely < 1.0.0 may return different geometries for empty geometries.
Parameters
----------
kwargs
Keyword args will be passed to :func:`shapely.to_wkt`.
Returns
-------
Series
WKT representations of the geometries
Examples
--------
>>> from shapely.geometry import Point
>>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)])
>>> s
0 POINT (1 1)
1 POINT (2 2)
2 POINT (3 3)
dtype: geometry
>>> s.to_wkt()
0 POINT (1 1)
1 POINT (2 2)
2 POINT (3 3)
dtype: object
See also
--------
GeoSeries.to_wkb
"""
spark_expr = stf.ST_AsText(self.spark.column)
return self._query_geometry_column(
spark_expr,
returns_geom=False,
)
[docs]
def to_arrow(self, geometry_encoding="WKB", interleaved=True, include_z=None):
"""Encode a GeoSeries to GeoArrow format.
See https://geoarrow.org/ for details on the GeoArrow specification.
This functions returns a generic Arrow array object implementing
the `Arrow PyCapsule Protocol`_ (i.e. having an ``__arrow_c_array__``
method). This object can then be consumed by your Arrow implementation
of choice that supports this protocol.
.. _Arrow PyCapsule Protocol: https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
Note: Requires geopandas versions >= 1.0.0 to use with Sedona.
Parameters
----------
geometry_encoding : {'WKB', 'geoarrow' }, default 'WKB'
The GeoArrow encoding to use for the data conversion.
interleaved : bool, default True
Only relevant for 'geoarrow' encoding. If True, the geometries'
coordinates are interleaved in a single fixed size list array.
If False, the coordinates are stored as separate arrays in a
struct type.
include_z : bool, default None
Only relevant for 'geoarrow' encoding (for WKB, the dimensionality
of the individual geometries is preserved).
If False, return 2D geometries. If True, include the third dimension
in the output (if a geometry has no third dimension, the z-coordinates
will be NaN). By default, will infer the dimensionality from the
input geometries. Note that this inference can be unreliable with
empty geometries (for a guaranteed result, it is recommended to
specify the keyword).
Returns
-------
GeoArrowArray
A generic Arrow array object with geometry data encoded to GeoArrow.
Examples
--------
>>> from sedona.spark.geopandas import GeoSeries
>>> from shapely.geometry import Point
>>> gser = GeoSeries([Point(1, 2), Point(2, 1)])
>>> gser
0 POINT (1 2)
1 POINT (2 1)
dtype: geometry
>>> arrow_array = gser.to_arrow()
>>> arrow_array
<geopandas.io._geoarrow.GeoArrowArray object at ...>
The returned array object needs to be consumed by a library implementing
the Arrow PyCapsule Protocol. For example, wrapping the data as a
pyarrow.Array (requires pyarrow >= 14.0):
>>> import pyarrow as pa
>>> array = pa.array(arrow_array)
>>> array
<pyarrow.lib.BinaryArray object at ...>
[
0101000000000000000000F03F0000000000000040,
01010000000000000000000040000000000000F03F
]
"""
# Because this function returns the arrow array in memory, we simply rely on geopandas's implementation.
# This also returns a geopandas specific data type, which can be converted to an actual pyarrow array,
# so there is no direct Sedona equivalent. This way we also get all of the arguments implemented for free.
return self.to_geopandas().to_arrow(
geometry_encoding=geometry_encoding,
interleaved=interleaved,
include_z=include_z,
)
[docs]
def clip(self, mask, keep_geom_type: bool = False, sort=False) -> "GeoSeries":
raise NotImplementedError(
_not_implemented_error(
"clip", "Clips geometries to the bounds of a mask geometry."
)
)
[docs]
def to_file(
self,
path: str,
driver: Union[str, None] = None,
schema: Union[dict, None] = None,
index: Union[bool, None] = None,
**kwargs,
):
"""Write the ``GeoSeries`` to a file.
Parameters
----------
path : str
File path or file handle to write to.
driver : str, optional
The format driver used to write the file, by default None. If not
specified, it's inferred from the file extension. Available formats
are "geojson", "geopackage", and "geoparquet".
index : bool, optional
If True, writes the index as a column. If False, no index is
written. By default None, the index is written only if it is named,
is a MultiIndex, or has a non-integer data type.
mode : str, default 'w'
The write mode: 'w' to overwrite the existing file or 'a' to append.
crs : pyproj.CRS, optional
The coordinate reference system to write. If None, it is determined
from the ``GeoSeries`` `crs` attribute. The value can be anything
accepted by :meth:`pyproj.CRS.from_user_input()`, such as an
authority string (e.g., "EPSG:4326") or a WKT string.
**kwargs
Additional keyword arguments passed to the underlying writing engine.
Examples
--------
>>> from shapely.geometry import Point, LineString
>>> from sedona.spark.geopandas import GeoSeries
>>> # Note: Examples write to temporary files for demonstration
>>> import tempfile
>>> import os
Create a GeoSeries:
>>> gs = GeoSeries(
... [Point(0, 0), LineString([(1, 1), (2, 2)])],
... index=["a", "b"]
... )
Save to a GeoParquet file:
>>> path_parquet = os.path.join(tempfile.gettempdir(), "data.parquet")
>>> gs.to_file(path_parquet, driver="geoparquet")
Append to a GeoJSON file:
>>> path_json = os.path.join(tempfile.gettempdir(), "data.json")
>>> gs.to_file(path_json, driver="geojson", mode='a')
"""
self.to_geoframe().to_file(path, driver, index=index, **kwargs)
[docs]
def to_parquet(self, path, **kwargs):
"""Write the GeoSeries to a GeoParquet file.
Parameters
----------
path : str
The file path where the GeoParquet file will be written.
**kwargs
Additional keyword arguments passed to the underlying writing function.
Returns
-------
None
Examples
--------
>>> from shapely.geometry import Point
>>> from sedona.spark.geopandas import GeoSeries
>>> import tempfile
>>> import os
>>> gs = GeoSeries([Point(1, 1), Point(2, 2)])
>>> file_path = os.path.join(tempfile.gettempdir(), "my_geodata.parquet")
>>> gs.to_parquet(file_path)
"""
self.to_geoframe().to_file(path, driver="geoparquet", **kwargs)
# -----------------------------------------------------------------------------
# # Utils
# -----------------------------------------------------------------------------
def _update_inplace(self, result: "GeoSeries", invalidate_sindex: bool = True):
self.rename(result.name, inplace=True)
self._update_anchor(result._anchor)
if invalidate_sindex:
self._sindex = None
def _make_series_of_val(self, value: Any):
"""
A helper method to turn single objects into series (ps.Series or GeoSeries when possible)
Returns:
tuple[pspd.Series, bool]:
- The series of the value
- Whether returned value was a single object extended into a series (useful for row-wise 'align' parameter)
"""
# generator instead of a in-memory list
if isinstance(value, GeoDataFrame):
return value.geometry, False
elif not isinstance(value, pspd.Series):
lst = [value for _ in range(len(self))]
if isinstance(value, BaseGeometry):
return GeoSeries(lst), True
else:
# e.g int input
return pspd.Series(lst), True
else:
return value, False
[docs]
def to_geoframe(self, name=None):
if name is not None:
renamed = self.rename(name)
elif self._column_label is None or self._column_label == (None,):
renamed = self.rename("geometry")
else:
renamed = self
index_col = SPARK_DEFAULT_INDEX_NAME
result = GeoDataFrame(
pspd.DataFrame(renamed._internal).to_spark(index_col).pandas_api(index_col)
)
result.index.name = self.index.name
return result
# -----------------------------------------------------------------------------
# # Utils
# -----------------------------------------------------------------------------
def _get_series_col_name(ps_series: pspd.Series) -> str:
return ps_series.name if ps_series.name else SPARK_DEFAULT_SERIES_NAME
def _to_bool(ps_series: pspd.Series, default: bool = False) -> pspd.Series:
"""
Cast a ps.Series to bool type if it's not one, converting None values to the default value.
"""
if ps_series.dtype.name != "bool":
# fill None values with the default value
ps_series.fillna(default, inplace=True)
return ps_series