Source code for sedona.spark.geopandas.geoseries

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import typing
from typing import Any, Union, Literal, List

import numpy as np
import geopandas as gpd
import sedona.spark.geopandas as sgpd
import pandas as pd
import pyspark.pandas as pspd
import pyspark
from pyspark.pandas import Series as PandasOnSparkSeries
from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
from pyspark.pandas.internal import InternalFrame
from pyspark.pandas.series import first_series
from pyspark.pandas.utils import scol_for
from pyspark.sql.types import NullType
from sedona.spark.sql.types import GeometryType

from sedona.spark.sql import st_aggregates as sta
from sedona.spark.sql import st_constructors as stc
from sedona.spark.sql import st_functions as stf
from sedona.spark.sql import st_predicates as stp

from pyspark.sql import Column as PySparkColumn
from pyspark.sql import functions as F

import shapely
from shapely.geometry.base import BaseGeometry

from sedona.spark.geopandas._typing import Label
from sedona.spark.geopandas.base import GeoFrame
from sedona.spark.geopandas.geodataframe import GeoDataFrame
from sedona.spark.geopandas.sindex import SpatialIndex
from packaging.version import parse as parse_version

from pyspark.pandas.internal import (
    SPARK_DEFAULT_INDEX_NAME,  # __index_level_0__
    NATURAL_ORDER_COLUMN_NAME,
    SPARK_DEFAULT_SERIES_NAME,  # '0'
)


# ============================================================================
# IMPLEMENTATION STATUS TRACKING
# ============================================================================

IMPLEMENTATION_STATUS = {
    "IMPLEMENTED": [
        "area",
        "buffer",
        "bounds",
        "centroid",
        "contains",
        "crs",
        "distance",
        "envelope",
        "geometry",
        "intersection",
        "intersects",
        "is_empty",
        "is_simple",
        "is_valid",
        "is_valid_reason",
        "length",
        "make_valid",
        "set_crs",
        "to_crs",
        "to_geopandas",
        "to_wkb",
        "to_wkt",
        "x",
        "y",
        "z",
        "has_z",
        "get_geometry",
        "boundary",
        "total_bounds",
        "estimate_utm_crs",
        "isna",
        "isnull",
        "notna",
        "notnull",
        "from_xy",
        "copy",
        "geom_type",
        "sindex",
    ],
    "NOT_IMPLEMENTED": [
        "clip",
        "contains_properly",
        "convex_hull",
        "count_coordinates",
        "count_geometries",
        "count_interior_rings",
        "explode",
        "force_2d",
        "force_3d",
        "from_file",
        "from_shapely",
        "from_arrow",
        "line_merge",
        "reverse",
        "segmentize",
        "to_json",
        "to_arrow",
        "to_file",
        "transform",
        "unary_union",
        "union_all",
        "intersection_all",
        "type",
        "is_ring",
        "is_ccw",
        "is_closed",
        "get_precision",
        "concave_hull",
        "delaunay_triangles",
        "voronoi_polygons",
        "minimum_rotated_rectangle",
        "exterior",
        "extract_unique_points",
        "offset_curve",
        "interiors",
        "remove_repeated_points",
        "set_precision",
        "representative_point",
        "minimum_bounding_circle",
        "minimum_bounding_radius",
        "minimum_clearance",
        "normalize",
        "m",
    ],
    "PARTIALLY_IMPLEMENTED": [
        "fillna",  # Limited parameter support (no 'limit' parameter)
        "from_wkb",
        "from_wkt",  # Limited error handling options (only 'raise' supported)
    ],
}

IMPLEMENTATION_PRIORITY = {
    "HIGH": [
        "contains",
        "contains_properly",
        "convex_hull",
        "explode",
        "clip",
        "from_shapely",
        "count_coordinates",
        "count_geometries",
        "is_ring",
        "is_closed",
        "reverse",
    ],
    "MEDIUM": [
        "force_2d",
        "force_3d",
        "transform",
        "segmentize",
        "line_merge",
        "unary_union",
        "union_all",
        "to_json",
        "from_file",
        "count_interior_rings",
    ],
    "LOW": [
        "delaunay_triangles",
        "voronoi_polygons",
        "minimum_bounding_circle",
        "representative_point",
        "extract_unique_points",
        "from_arrow",
        "to_arrow",
    ],
}


def _not_implemented_error(method_name: str, additional_info: str = "") -> str:
    """
    Generate a standardized NotImplementedError message.

    Parameters
    ----------
    method_name : str
        The name of the method that is not implemented.
    additional_info : str, optional
        Additional information about the method or workarounds.

    Returns
    -------
    str
        Formatted error message.
    """
    base_message = (
        f"GeoSeries.{method_name}() is not implemented yet.\n"
        f"This method will be added in a future release."
    )

    if additional_info:
        base_message += f"\n\n{additional_info}"

    workaround = (
        "\n\nTemporary workaround - use GeoPandas:\n"
        "  gpd_series = sedona_series.to_geopandas()\n"
        f"  result = gpd_series.{method_name}(...)\n"
        "  # Note: This will collect all data to the driver."
    )

    return base_message + workaround


[docs] class GeoSeries(GeoFrame, pspd.Series): """ A pandas-on-Spark Series for geometric/spatial operations. GeoSeries extends pyspark.pandas.Series to provide spatial operations using Apache Sedona's spatial functions. It maintains compatibility with GeoPandas GeoSeries while operating on distributed datasets. Parameters ---------- data : array-like, Iterable, dict, or scalar value Contains the data for the GeoSeries. Can be geometries, WKB bytes, or other GeoSeries/GeoDataFrame objects. index : array-like or Index (1d), optional Values must be hashable and have the same length as `data`. crs : pyproj.CRS, optional Coordinate Reference System for the geometries. dtype : dtype, optional Data type for the GeoSeries. name : str, optional Name of the GeoSeries. copy : bool, default False Whether to copy the input data. Examples -------- >>> from shapely.geometry import Point, Polygon >>> from sedona.spark.geopandas import GeoSeries >>> >>> # Create from geometries >>> s = GeoSeries([Point(0, 0), Point(1, 1)], crs='EPSG:4326') >>> s 0 POINT (0 0) 1 POINT (1 1) dtype: geometry >>> >>> # Spatial operations >>> s.buffer(0.1).area 0 0.031416 1 0.031416 dtype: float64 >>> >>> # CRS operations >>> s_utm = s.to_crs('EPSG:32633') >>> s_utm.crs <Projected CRS: EPSG:32633> Name: WGS 84 / UTM zone 33N ... Notes ----- This implementation differs from GeoPandas in several ways: - Uses Spark for distributed processing - Geometries are stored in WKB (Well-Known Binary) format internally - Some methods may have different performance characteristics - Not all GeoPandas methods are implemented yet (see IMPLEMENTATION_STATUS) Performance Considerations: - Operations are distributed across Spark cluster - Avoid calling .to_geopandas() on large datasets - Use .sample() for testing with large datasets See Also -------- geopandas.GeoSeries : The GeoPandas equivalent sedona.spark.geopandas.GeoDataFrame : DataFrame with geometry column """ def __getitem__(self, key: Any) -> Any: return pspd.Series.__getitem__(self, key)
[docs] def __init__( self, data=None, index=None, dtype=None, name=None, copy=False, fastpath=False, crs=None, **kwargs, ): """ Initialize a GeoSeries object. Parameters: - data: The input data for the GeoSeries. It can be a GeoDataFrame, GeoSeries, or pandas Series. - index: The index for the GeoSeries. - crs: Coordinate Reference System for the GeoSeries. - dtype: Data type for the GeoSeries. - name: Name of the GeoSeries. - copy: Whether to copy the input data. - fastpath: Internal parameter for fast initialization. Examples -------- >>> from shapely.geometry import Point >>> import geopandas as gpd >>> import pandas as pd >>> from sedona.spark.geopandas import GeoSeries # Example 1: Initialize with GeoDataFrame >>> gdf = gpd.GeoDataFrame({'geometry': [Point(1, 1), Point(2, 2)]}) >>> gs = GeoSeries(data=gdf) >>> print(gs) 0 POINT (1 1) 1 POINT (2 2) Name: geometry, dtype: geometry # Example 2: Initialize with GeoSeries >>> gseries = gpd.GeoSeries([Point(1, 1), Point(2, 2)]) >>> gs = GeoSeries(data=gseries) >>> print(gs) 0 POINT (1 1) 1 POINT (2 2) dtype: geometry # Example 3: Initialize with pandas Series >>> pseries = pd.Series([Point(1, 1), Point(2, 2)]) >>> gs = GeoSeries(data=pseries) >>> print(gs) 0 POINT (1 1) 1 POINT (2 2) dtype: geometry """ assert data is not None self._anchor: GeoDataFrame self._col_label: Label self._sindex: SpatialIndex = None if isinstance( data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries, PandasOnSparkDataFrame) ): assert dtype is None assert name is None assert not copy assert not fastpath # We don't check crs validity to keep the operation lazy. # Keep the original code for now # data_crs = None # if hasattr(data, "crs"): # data_crs = data.crs # if data_crs is not None and crs is not None and data_crs != crs: # raise ValueError( # "CRS mismatch between CRS of the passed geometries " # "and 'crs'. Use 'GeoSeries.set_crs(crs, " # "allow_override=True)' to overwrite CRS or " # "'GeoSeries.to_crs(crs)' to reproject geometries. " # ) # PySpark Pandas' ps.Series.__init__() does not construction from a # ps.Series input. For now, we manually implement the logic. index = data._col_label if index is None else index ps_df = pspd.DataFrame(data._anchor) super().__init__( data=ps_df, index=index, dtype=dtype, name=name, copy=copy, fastpath=fastpath, ) else: if isinstance(data, pd.Series): assert index is None assert dtype is None assert name is None assert not copy assert not fastpath pd_series = data else: pd_series = pd.Series( data=data, index=index, dtype=dtype, name=name, copy=copy, fastpath=fastpath, ) pd_series = pd_series.astype(object) # initialize the parent class pyspark Series with the pandas Series super().__init__(data=pd_series) # Ensure we're storing geometry types if ( self.spark.data_type != GeometryType() and self.spark.data_type != NullType() ): raise TypeError( "Non geometry data passed to GeoSeries constructor, " f"received data of dtype '{self.spark.data_type.typeName()}'" ) if crs: self.set_crs(crs, inplace=True)
# ============================================================================ # COORDINATE REFERENCE SYSTEM (CRS) OPERATIONS # ============================================================================ @property def crs(self) -> Union["CRS", None]: """The Coordinate Reference System (CRS) as a ``pyproj.CRS`` object. Returns None if the CRS is not set, and to set the value it :getter: Returns a ``pyproj.CRS`` or None. When setting, the value can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:4326") or a WKT string. Note: This assumes all records in the GeoSeries are assumed to have the same CRS. Examples -------- >>> from shapely.geometry import Point >>> from sedona.spark.geopandas import GeoSeries >>> s = GeoSeries([Point(1, 1), Point(2, 2)], crs='EPSG:4326') >>> s.crs # doctest: +SKIP <Geographic 2D CRS: EPSG:4326> Name: WGS 84 Axis Info [ellipsoidal]: - Lat[north]: Geodetic latitude (degree) - Lon[east]: Geodetic longitude (degree) Area of Use: - name: World - bounds: (-180.0, -90.0, 180.0, 90.0) Datum: World Geodetic System 1984 - Ellipsoid: WGS 84 - Prime Meridian: Greenwich See Also -------- GeoSeries.set_crs : assign CRS GeoSeries.to_crs : re-project to another CRS """ from pyproj import CRS if len(self) == 0: return None # F.first is non-deterministic, but it doesn't matter because all non-null values should be the same spark_col = stf.ST_SRID(F.first(self.spark.column, ignorenulls=True)) # Set this to avoid error complaining that we don't have a groupby column tmp_series = self._query_geometry_column( spark_col, returns_geom=False, is_aggr=True, ) # All geometries should have the same srid # so we just take the srid of the first non-null element srid = tmp_series.item() # Turn np.nan to 0 to avoid error srid = 0 if np.isnan(srid) else srid # Sedona returns 0 if doesn't exist return CRS.from_user_input(srid) if srid != 0 else None @crs.setter def crs(self, value: Union["CRS", None]): self.set_crs(value, inplace=True) @typing.overload def set_crs( self, crs: Union[Any, None] = None, epsg: Union[int, None] = None, inplace: Literal[True] = True, allow_override: bool = False, ) -> None: ... @typing.overload def set_crs( self, crs: Union[Any, None] = None, epsg: Union[int, None] = None, inplace: Literal[False] = False, allow_override: bool = False, ) -> "GeoSeries": ...
[docs] def set_crs( self, crs: Union[Any, None] = None, epsg: Union[int, None] = None, inplace: bool = False, allow_override: bool = True, ) -> Union["GeoSeries", None]: """ Set the Coordinate Reference System (CRS) of a ``GeoSeries``. Pass ``None`` to remove CRS from the ``GeoSeries``. Notes ----- The underlying geometries are not transformed to this CRS. To transform the geometries to a new CRS, use the ``to_crs`` method. Parameters ---------- crs : pyproj.CRS | None, optional The value can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:4326") or a WKT string. epsg : int, optional if `crs` is specified EPSG code specifying the projection. inplace : bool, default False If True, the CRS of the GeoSeries will be changed in place (while still returning the result) instead of making a copy of the GeoSeries. allow_override : bool, default True If the GeoSeries already has a CRS, allow to replace the existing CRS, even when both are not equal. In Sedona, setting this to True will lead to eager evaluation instead of lazy evaluation. Unlike Geopandas, True is the default value in Sedona for performance reasons. Returns ------- GeoSeries Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Point >>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)]) >>> s 0 POINT (1 1) 1 POINT (2 2) 2 POINT (3 3) dtype: geometry Setting CRS to a GeoSeries without one: >>> s.crs is None True >>> s = s.set_crs('epsg:3857') >>> s.crs # doctest: +SKIP <Projected CRS: EPSG:3857> Name: WGS 84 / Pseudo-Mercator Axis Info [cartesian]: - X[east]: Easting (metre) - Y[north]: Northing (metre) Area of Use: - name: World - 85°S to 85°N - bounds: (-180.0, -85.06, 180.0, 85.06) Coordinate Operation: - name: Popular Visualisation Pseudo-Mercator - method: Popular Visualisation Pseudo Mercator Datum: World Geodetic System 1984 - Ellipsoid: WGS 84 - Prime Meridian: Greenwich Overriding existing CRS: >>> s = s.set_crs(4326, allow_override=True) Without ``allow_override=True``, ``set_crs`` returns an error if you try to override CRS. See Also -------- GeoSeries.to_crs : re-project to another CRS """ from pyproj import CRS if crs is not None: crs = CRS.from_user_input(crs) elif epsg is not None: crs = CRS.from_epsg(epsg) # The below block for the not allow_override case is eager due to the self.crs call # This hurts performance and user experience, hence the default being set to True in Sedona if not allow_override: curr_crs = self.crs if curr_crs is not None and not curr_crs == crs: raise ValueError( "The GeoSeries already has a CRS which is not equal to the passed " "CRS. Specify 'allow_override=True' to allow replacing the existing " "CRS without doing any transformation. If you actually want to " "transform the geometries, use 'GeoSeries.to_crs' instead." ) # 0 indicates no srid in sedona new_epsg = crs.to_epsg() if crs else 0 spark_col = stf.ST_SetSRID(self.spark.column, new_epsg) result = self._query_geometry_column(spark_col, keep_name=True) if inplace: self._update_inplace(result, invalidate_sindex=False) return None return result
# ============================================================================ # INTERNAL HELPER METHODS # ============================================================================ def _query_geometry_column( self, spark_col: PySparkColumn, df: pyspark.sql.DataFrame = None, returns_geom: bool = True, is_aggr: bool = False, keep_name: bool = False, ) -> Union["GeoSeries", pspd.Series]: """ Helper method to query a single geometry column with a specified operation. Parameters ---------- spark_col : str The query to apply to the geometry column. df : pyspark.sql.DataFrame The dataframe to query. If not provided, the internal dataframe will be used. returns_geom : bool, default True If True, the geometry column will be converted back to EWKB format. is_aggr : bool, default False If True, the query is an aggregation query. Returns ------- GeoSeries A GeoSeries with the operation applied to the geometry column. """ df = self._internal.spark_frame if df is None else df rename = SPARK_DEFAULT_SERIES_NAME if keep_name and self.name: rename = self.name col_expr = spark_col.alias(rename) exprs = [col_expr] index_spark_columns = [] index_fields = [] if not is_aggr: # We always select NATURAL_ORDER_COLUMN_NAME, to avoid having to regenerate it in the result # We always select SPARK_DEFAULT_INDEX_NAME, to retain series index info exprs.append(scol_for(df, SPARK_DEFAULT_INDEX_NAME)) exprs.append(scol_for(df, NATURAL_ORDER_COLUMN_NAME)) index_spark_columns = [scol_for(df, SPARK_DEFAULT_INDEX_NAME)] index_fields = [self._internal.index_fields[0]] sdf = df.select( col_expr, scol_for(df, SPARK_DEFAULT_INDEX_NAME), scol_for(df, NATURAL_ORDER_COLUMN_NAME), ) # else if is_aggr, we don't select the index columns else: sdf = df.select(*exprs) internal = self._internal.copy( spark_frame=sdf, index_fields=index_fields, index_spark_columns=index_spark_columns, data_spark_columns=[scol_for(sdf, rename)], data_fields=[self._internal.data_fields[0].copy(name=rename)], column_label_names=[(rename,)], ) ps_series = first_series(PandasOnSparkDataFrame(internal)) # Convert spark series default name to pandas series default name (None) if needed series_name = None if rename == SPARK_DEFAULT_SERIES_NAME else rename ps_series = ps_series.rename(series_name) result = GeoSeries(ps_series) if returns_geom else ps_series return result # ============================================================================ # CONVERSION AND SERIALIZATION METHODS # ============================================================================
[docs] def to_geopandas(self) -> gpd.GeoSeries: """ Convert the GeoSeries to a geopandas GeoSeries. Returns: - geopandas.GeoSeries: A geopandas GeoSeries. """ from pyspark.pandas.utils import log_advice log_advice( "`to_geopandas` loads all data into the driver's memory. " "It should only be used if the resulting geopandas GeoSeries is expected to be small." ) return self._to_geopandas()
def _to_geopandas(self) -> gpd.GeoSeries: """ Same as `to_geopandas()`, without issuing the advice log for internal usage. """ pd_series = self._to_internal_pandas() return gpd.GeoSeries(pd_series, crs=self.crs)
[docs] def to_spark_pandas(self) -> pspd.Series: return pspd.Series(pspd.DataFrame(self._psdf._internal))
# ============================================================================ # PROPERTIES AND ATTRIBUTES # ============================================================================ @property def geometry(self) -> "GeoSeries": return self @property def sindex(self) -> SpatialIndex: geometry_column = _get_series_col_name(self) if geometry_column is None: raise ValueError("No geometry column found in GeoSeries") if self._sindex is None: self._sindex = SpatialIndex( self._internal.spark_frame, column_name=geometry_column ) return self._sindex @property def has_sindex(self): return self._sindex is not None
[docs] def copy(self, deep=False): """Make a copy of this GeoSeries object. Parameters ---------- deep : bool, default False If True, a deep copy of the data is made. Otherwise, a shallow copy is made. Returns ------- GeoSeries A copy of this GeoSeries object. Examples -------- >>> from shapely.geometry import Point >>> from sedona.spark.geopandas import GeoSeries >>> gs = GeoSeries([Point(1, 1), Point(2, 2)]) >>> gs_copy = gs.copy() >>> print(gs_copy) 0 POINT (1 1) 1 POINT (2 2) dtype: geometry """ if deep: return GeoSeries( self._anchor.copy(), dtype=self.dtype, index=self._col_label ) else: return self
@property def area(self) -> pspd.Series: spark_col = stf.ST_Area(self.spark.column) return self._query_geometry_column( spark_col, returns_geom=False, ) @property def geom_type(self) -> pspd.Series: spark_col = stf.GeometryType(self.spark.column) result = self._query_geometry_column( spark_col, returns_geom=False, ) # Sedona returns the string in all caps unlike Geopandas sgpd_to_gpg_name_map = { "POINT": "Point", "LINESTRING": "LineString", "POLYGON": "Polygon", "MULTIPOINT": "MultiPoint", "MULTILINESTRING": "MultiLineString", "MULTIPOLYGON": "MultiPolygon", "GEOMETRYCOLLECTION": "GeometryCollection", } result = result.map(lambda x: sgpd_to_gpg_name_map.get(x, x)) return result @property def type(self): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error("type", "Returns numeric geometry type codes.") ) @property def length(self) -> pspd.Series: spark_expr = ( F.when( stf.GeometryType(self.spark.column).isin( ["LINESTRING", "MULTILINESTRING"] ), stf.ST_Length(self.spark.column), ) .when( stf.GeometryType(self.spark.column).isin(["POLYGON", "MULTIPOLYGON"]), stf.ST_Perimeter(self.spark.column), ) .when( stf.GeometryType(self.spark.column).isin(["POINT", "MULTIPOINT"]), 0.0, ) .when( stf.GeometryType(self.spark.column).isin(["GEOMETRYCOLLECTION"]), stf.ST_Length(self.spark.column) + stf.ST_Perimeter(self.spark.column), ) ) return self._query_geometry_column( spark_expr, returns_geom=False, ) @property def is_valid(self) -> pspd.Series: spark_col = stf.ST_IsValid(self.spark.column) result = self._query_geometry_column( spark_col, returns_geom=False, ) return _to_bool(result)
[docs] def is_valid_reason(self) -> pspd.Series: spark_col = stf.ST_IsValidReason(self.spark.column) return self._query_geometry_column( spark_col, returns_geom=False, )
@property def is_empty(self) -> pspd.Series: spark_expr = stf.ST_IsEmpty(self.spark.column) result = self._query_geometry_column( spark_expr, returns_geom=False, ) return _to_bool(result)
[docs] def count_coordinates(self): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error( "count_coordinates", "Counts the number of coordinate tuples in each geometry.", ) )
[docs] def count_geometries(self): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error( "count_geometries", "Counts the number of geometries in each multi-geometry or collection.", ) )
[docs] def count_interior_rings(self): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error( "count_interior_rings", "Counts the number of interior rings (holes) in each polygon.", ) )
[docs] def dwithin(self, other, distance, align=None): if not isinstance(distance, (float, int)): raise NotImplementedError( "Array-like distance for dwithin not implemented yet." ) other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stp.ST_DWithin(F.col("L"), F.col("R"), F.lit(distance)) return self._row_wise_operation( spark_expr, other_series, align=align, returns_geom=False, default_val=False, )
[docs] def difference(self, other, align=None) -> "GeoSeries": other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stf.ST_Difference(F.col("L"), F.col("R")) return self._row_wise_operation( spark_expr, other_series, align=align, returns_geom=True, )
@property def is_simple(self) -> pspd.Series: spark_expr = stf.ST_IsSimple(self.spark.column) result = self._query_geometry_column( spark_expr, returns_geom=False, ) return _to_bool(result) @property def is_ring(self): spark_expr = stf.ST_IsRing(self.spark.column) result = self._query_geometry_column( spark_expr, returns_geom=False, ) return _to_bool(result) @property def is_ccw(self): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error( "is_ccw", "Tests if LinearRing geometries are oriented counter-clockwise.", ) ) @property def is_closed(self): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error( "is_closed", "Tests if LineString geometries are closed (start equals end point).", ) ) @property def has_z(self) -> pspd.Series: spark_expr = stf.ST_HasZ(self.spark.column) return self._query_geometry_column( spark_expr, returns_geom=False, )
[docs] def get_precision(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def get_geometry(self, index) -> "GeoSeries": # Sedona errors on negative indexes, so we use a case statement to handle it ourselves spark_expr = stf.ST_GeometryN( F.col("L"), F.when( stf.ST_NumGeometries(F.col("L")) + F.col("R") < 0, None, ) .when(F.col("R") < 0, stf.ST_NumGeometries(F.col("L")) + F.col("R")) .otherwise(F.col("R")), ) other, _ = self._make_series_of_val(index) # align = False either way align = False return self._row_wise_operation( spark_expr, other, align=align, returns_geom=True, default_val=None, )
@property def boundary(self) -> "GeoSeries": # Geopandas and shapely return NULL for GeometryCollections, so we handle it separately # https://shapely.readthedocs.io/en/stable/reference/shapely.boundary.html spark_expr = F.when( stf.GeometryType(self.spark.column).isin(["GEOMETRYCOLLECTION"]), None, ).otherwise(stf.ST_Boundary(self.spark.column)) return self._query_geometry_column( spark_expr, ) @property def centroid(self) -> "GeoSeries": spark_expr = stf.ST_Centroid(self.spark.column) return self._query_geometry_column( spark_expr, returns_geom=True, )
[docs] def concave_hull(self, ratio=0.0, allow_holes=False): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
@property def convex_hull(self): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error( "convex_hull", "Computes the convex hull of each geometry." ) )
[docs] def delaunay_triangles(self, tolerance=0.0, only_edges=False): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def voronoi_polygons(self, tolerance=0.0, extend_to=None, only_edges=False): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
@property def envelope(self) -> "GeoSeries": spark_expr = stf.ST_Envelope(self.spark.column) return self._query_geometry_column( spark_expr, returns_geom=True, )
[docs] def minimum_rotated_rectangle(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
@property def exterior(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def extract_unique_points(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def offset_curve(self, distance, quad_segs=8, join_style="round", mitre_limit=5.0): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
@property def interiors(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def remove_repeated_points(self, tolerance=0.0): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def set_precision(self, grid_size, mode="valid_output"): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def representative_point(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def minimum_bounding_circle(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def minimum_bounding_radius(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def minimum_clearance(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def normalize(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def make_valid(self, *, method="linework", keep_collapsed=True) -> "GeoSeries": if method != "structure": raise ValueError( "Sedona only supports the 'structure' method for make_valid" ) spark_expr = stf.ST_MakeValid(self.spark.column, keep_collapsed) return self._query_geometry_column( spark_expr, returns_geom=True, )
[docs] def reverse(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def segmentize(self, max_segment_length): other_series, extended = self._make_series_of_val(max_segment_length) align = False if extended else align spark_expr = stf.ST_Segmentize(F.col("L"), F.col("R")) return self._row_wise_operation( spark_expr, other_series, align=align, returns_geom=True, )
[docs] def transform(self, transformation, include_z=False): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def force_2d(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def force_3d(self, z=0): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def line_merge(self, directed=False): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
# ============================================================================ # GEOMETRIC OPERATIONS # ============================================================================ @property def unary_union(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def union_all(self, method="unary", grid_size=None) -> BaseGeometry: if grid_size is not None: raise NotImplementedError("Sedona does not support the grid_size argument") if method != "unary": import warnings warnings.warn( f"Sedona does not support manually specifying different union methods. Ignoring non-default method argument of {method}" ) if len(self) == 0: # While it's not explicitly defined in geopandas docs, this is what geopandas returns for empty GeoSeries # If it ever changes for some reason, we'll catch that with the test from shapely.geometry import GeometryCollection return GeometryCollection() spark_expr = sta.ST_Union_Aggr(self.spark.column) tmp = self._query_geometry_column(spark_expr, returns_geom=False, is_aggr=True) ps_series = tmp.take([0]) geom = ps_series.iloc[0] return geom
[docs] def crosses(self, other, align=None) -> pspd.Series: # Sedona does not support GeometryCollection (errors), so we return NULL for now to avoid error other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = F.when( (stf.GeometryType(F.col("L")) == "GEOMETRYCOLLECTION") | (stf.GeometryType(F.col("R")) == "GEOMETRYCOLLECTION"), None, ).otherwise(stp.ST_Crosses(F.col("L"), F.col("R"))) result = self._row_wise_operation( spark_expr, other_series, align, default_val=False, ) return _to_bool(result)
[docs] def disjoint(self, other, align=None): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
[docs] def intersects( self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, None] = None ) -> pspd.Series: other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stp.ST_Intersects(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, default_val=False, ) return _to_bool(result)
[docs] def overlaps(self, other, align=None) -> pspd.Series: # Note: We cannot efficiently match geopandas behavior because Sedona's ST_Overlaps returns True for equal geometries # ST_Overlaps(`L`, `R`) AND ST_Equals(`L`, `R`) does not work because ST_Equals errors on invalid geometries other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stp.ST_Overlaps(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, default_val=False, ) return _to_bool(result)
[docs] def touches(self, other, align=None) -> pspd.Series: other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stp.ST_Touches(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, default_val=False, ) return _to_bool(result)
[docs] def within(self, other, align=None) -> pspd.Series: other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stp.ST_Within(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, default_val=False, ) return _to_bool(result)
[docs] def covers(self, other, align=None) -> pspd.Series: other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stp.ST_Covers(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, default_val=False, ) return _to_bool(result)
[docs] def covered_by(self, other, align=None) -> pspd.Series: other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stp.ST_CoveredBy(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, default_val=False, ) return _to_bool(result)
[docs] def distance(self, other, align=None) -> pspd.Series: other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stf.ST_Distance(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, default_val=None, ) return result
[docs] def intersection( self, other: Union["GeoSeries", BaseGeometry], align: Union[bool, None] = None ) -> "GeoSeries": other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stf.ST_Intersection(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other_series, align, returns_geom=True, default_val=None, ) return result
[docs] def snap(self, other, tolerance, align=None) -> "GeoSeries": if not isinstance(tolerance, (float, int)): raise NotImplementedError( "Array-like values for tolerance are not supported yet." ) # Both sgpd and gpd implementations simply call the snap functions # in JTS and GEOs, respectively. The results often differ slightly, but these # must be differences inside of the engines themselves. other_series, extended = self._make_series_of_val(other) align = False if extended else align spark_expr = stf.ST_Snap(F.col("L"), F.col("R"), tolerance) result = self._row_wise_operation( spark_expr, other_series, align, returns_geom=True, ) return result
def _row_wise_operation( self, spark_col: PySparkColumn, other: pspd.Series, align: Union[bool, None], returns_geom: bool = False, default_val: Any = None, keep_name: bool = False, ): """ Helper function to perform a row-wise operation on two GeoSeries. The self column and other column are aliased to `L` and `R`, respectively. align : bool or None (default None) If True, automatically aligns GeoSeries based on their indices. None defaults to True. If False, the order of elements is preserved. Note: align should also be set to False when 'other' a geoseries created from a single object (e.g. GeoSeries([Point(0, 0) * len(self)])), so that we align based on natural ordering in case the index is not the default range index from 0. Alternatively, we could create 'other' using the same index as self, but that would require index=self.index.to_pandas() which is less scalable. default_val : Any (default None) The value to use if either L or R is null. If None, nulls are not handled. """ from pyspark.sql.functions import col # Note: this is specifically False. None is valid since it defaults to True similar to geopandas index_col = ( NATURAL_ORDER_COLUMN_NAME if align is False else SPARK_DEFAULT_INDEX_NAME ) # This code assumes there is only one index (SPARK_DEFAULT_INDEX_NAME) # and would need to be updated if Sedona later supports multi-index sdf = self._internal.spark_frame other_sdf = other._internal.spark_frame if index_col == NATURAL_ORDER_COLUMN_NAME: # NATURAL_ORDER_COLUMN_NAME is not deterministic or sequential, so we instead replace it with a # new column of row_numbers to perform the alignment. This mimics the following code. # sdf.withColumn(index_col, F.row_number().over(Window.orderBy(NATURAL_ORDER_COLUMN_NAME))) sdf = sdf.drop(index_col) other_sdf = other_sdf.drop(index_col) sdf = self._internal.attach_distributed_sequence_column(sdf, index_col) other_sdf = other._internal.attach_distributed_sequence_column( other_sdf, index_col ) sdf = sdf.select( self.spark.column.alias("L"), # For the left side: # - We always select NATURAL_ORDER_COLUMN_NAME, to avoid having to regenerate it in the result # - We always select SPARK_DEFAULT_INDEX_NAME, to retain series index info col(NATURAL_ORDER_COLUMN_NAME), col(SPARK_DEFAULT_INDEX_NAME), ) other_sdf = other_sdf.select( other.spark.column.alias("R"), # for the right side, we only need the column that we are joining on col(index_col), ) joined_df = sdf.join(other_sdf, on=index_col, how="outer") if default_val is not None: # ps.Series.fillna() doesn't always work for the output for some reason # so we manually handle the nulls here. spark_col = F.when( F.col("L").isNull() | F.col("R").isNull(), default_val, ).otherwise(spark_col) return self._query_geometry_column( spark_col, joined_df, returns_geom=returns_geom, keep_name=keep_name, )
[docs] def intersection_all(self): # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.")
# ============================================================================ # SPATIAL PREDICATES # ============================================================================
[docs] def contains(self, other, align=None) -> pspd.Series: other, extended = self._make_series_of_val(other) align = False if extended else align spark_col = stp.ST_Contains(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_col, other, align, returns_geom=False, default_val=False, ) return _to_bool(result)
[docs] def contains_properly(self, other, align=None): # Implementation of the abstract method raise NotImplementedError( _not_implemented_error( "contains_properly", "Tests if geometries properly contain other geometries (no boundary contact).", ) )
[docs] def buffer( self, distance, resolution=16, cap_style="round", join_style="round", mitre_limit=5.0, single_sided=False, **kwargs, ) -> "GeoSeries": if single_sided: # Reverse the following logic in common/src/main/java/org/apache/sedona/common/Functions.java buffer() to avoid negating the distance # if (bufferParameters.isSingleSided() # && (params.toLowerCase().contains("left") && radius < 0 # || params.toLowerCase().contains("right") && radius > 0)) { # radius = -radius; # } side = "left" if distance >= 0 else "right" else: side = "both" assert side in [ "left", "right", "both", ], "single-sided must be one of 'left', 'right', or 'both', True, or False" parameters = F.lit( f"quad_segs={resolution} endcap={cap_style} join={join_style} mitre_limit={mitre_limit} side={side}" ) spark_col = stf.ST_Buffer( self.spark.column, distance, useSpheroid=False, parameters=parameters ) return self._query_geometry_column( spark_col, returns_geom=True, )
[docs] def simplify(self, tolerance=None, preserve_topology=True) -> "GeoSeries": spark_expr = ( stf.ST_SimplifyPreserveTopology(self.spark.column, tolerance) if preserve_topology else stf.ST_Simplify(self.spark.column, tolerance) ) return self._query_geometry_column(spark_expr)
[docs] def plot(self, *args, **kwargs): """ Plot a GeoSeries. Generate a plot of a GeoSeries geometry with matplotlib. Note: This method is not scalable and requires collecting all data to the driver. Parameters ---------- s : Series The GeoSeries to be plotted. Currently Polygon, MultiPolygon, LineString, MultiLineString, Point and MultiPoint geometries can be plotted. cmap : str (default None) The name of a colormap recognized by matplotlib. Any colormap will work, but categorical colormaps are generally recommended. Examples of useful discrete colormaps include: tab10, tab20, Accent, Dark2, Paired, Pastel1, Set1, Set2 color : str, np.array, pd.Series, List (default None) If specified, all objects will be colored uniformly. ax : matplotlib.pyplot.Artist (default None) axes on which to draw the plot figsize : pair of floats (default None) Size of the resulting matplotlib.figure.Figure. If the argument ax is given explicitly, figsize is ignored. aspect : 'auto', 'equal', None or float (default 'auto') Set aspect of axis. If 'auto', the default aspect for map plots is 'equal'; if however data are not projected (coordinates are long/lat), the aspect is by default set to 1/cos(s_y * pi/180) with s_y the y coordinate of the middle of the GeoSeries (the mean of the y range of bounding box) so that a long/lat square appears square in the middle of the plot. This implies an Equirectangular projection. If None, the aspect of `ax` won't be changed. It can also be set manually (float) as the ratio of y-unit to x-unit. autolim : bool (default True) Update axes data limits to contain the new geometries. **style_kwds : dict Color options to be passed on to the actual plot function, such as ``edgecolor``, ``facecolor``, ``linewidth``, ``markersize``, ``alpha``. Returns ------- ax : matplotlib axes instance """ return self.to_geopandas().plot(*args, **kwargs)
@property def geometry(self) -> "GeoSeries": return self # GeoSeries-only (not in GeoDataFrame) @property def x(self) -> pspd.Series: """Return the x location of point geometries in a GeoSeries Returns ------- pandas.Series Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Point >>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)]) >>> s.x 0 1.0 1 2.0 2 3.0 dtype: float64 See Also -------- GeoSeries.y GeoSeries.z """ spark_col = stf.ST_X(self.spark.column) return self._query_geometry_column( spark_col, returns_geom=False, ) # GeoSeries-only (not in GeoDataFrame) @property def y(self) -> pspd.Series: """Return the y location of point geometries in a GeoSeries Returns ------- pandas.Series Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Point >>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)]) >>> s.y 0 1.0 1 2.0 2 3.0 dtype: float64 See Also -------- GeoSeries.x GeoSeries.z GeoSeries.m """ spark_col = stf.ST_Y(self.spark.column) return self._query_geometry_column( spark_col, returns_geom=False, ) # GeoSeries-only (not in GeoDataFrame) @property def z(self) -> pspd.Series: """Return the z location of point geometries in a GeoSeries Returns ------- pandas.Series Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Point >>> s = GeoSeries([Point(1, 1, 1), Point(2, 2, 2), Point(3, 3, 3)]) >>> s.z 0 1.0 1 2.0 2 3.0 dtype: float64 See Also -------- GeoSeries.x GeoSeries.y GeoSeries.m """ spark_col = stf.ST_Z(self.spark.column) return self._query_geometry_column( spark_col, returns_geom=False, ) # GeoSeries-only (not in GeoDataFrame) @property def m(self) -> pspd.Series: raise NotImplementedError("GeoSeries.m() is not implemented yet.") # ============================================================================ # CONSTRUCTION METHODS # ============================================================================ # GeoSeries-only (not in GeoDataFrame)
[docs] @classmethod def from_file( cls, filename: str, format: Union[str, None] = None, **kwargs ) -> "GeoSeries": """Alternate constructor to create a ``GeoDataFrame`` from a file. Parameters ---------- filename : str File path or file handle to read from. If the path is a directory, Sedona will read all files in that directory. format : str, optional The format of the file to read, by default None. If None, Sedona infers the format from the file extension. Note that format inference is not supported for directories. Available formats are "shapefile", "geojson", "geopackage", and "geoparquet". table_name : str, optional The name of the table to read from a GeoPackage file, by default None. This is required if ``format`` is "geopackage". See Also -------- GeoDataFrame.to_file : Write a ``GeoDataFrame`` to a file. """ df = sgpd.io.read_file(filename, format, **kwargs) return GeoSeries(df.geometry, crs=df.crs)
# GeoSeries-only (not in GeoDataFrame)
[docs] @classmethod def from_wkb( cls, data, index=None, crs: Union[Any, None] = None, on_invalid="raise", **kwargs, ) -> "GeoSeries": r""" Alternate constructor to create a ``GeoSeries`` from a list or array of WKB objects Parameters ---------- data : array-like or Series Series, list or array of WKB objects index : array-like or Index The index for the GeoSeries. crs : value, optional Coordinate Reference System of the geometry objects. Can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:4326") or a WKT string. on_invalid: {"raise", "warn", "ignore"}, default "raise" - raise: an exception will be raised if a WKB input geometry is invalid. - warn: a warning will be raised and invalid WKB geometries will be returned as None. - ignore: invalid WKB geometries will be returned as None without a warning. - fix: an effort is made to fix invalid input geometries (e.g. close unclosed rings). If this is not possible, they are returned as ``None`` without a warning. Requires GEOS >= 3.11 and shapely >= 2.1. kwargs Additional arguments passed to the Series constructor, e.g. ``name``. Returns ------- GeoSeries See Also -------- GeoSeries.from_wkt Examples -------- >>> wkbs = [ ... ( ... b"\x01\x01\x00\x00\x00\x00\x00\x00\x00" ... b"\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?" ... ), ... ( ... b"\x01\x01\x00\x00\x00\x00\x00\x00\x00" ... b"\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x00@" ... ), ... ( ... b"\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00" ... b"\x00\x08@\x00\x00\x00\x00\x00\x00\x08@" ... ), ... ] >>> s = GeoSeries.from_wkb(wkbs) >>> s 0 POINT (1 1) 1 POINT (2 2) 2 POINT (3 3) dtype: geometry """ if on_invalid != "raise": raise NotImplementedError( "GeoSeries.from_wkb(): only on_invalid='raise' is implemented" ) from pyspark.sql.types import StructType, StructField, BinaryType schema = StructType([StructField("data", BinaryType(), True)]) return cls._create_from_select( stc.ST_GeomFromWKB(F.col("data")), data, schema, index, crs, **kwargs, )
# GeoSeries-only (not in GeoDataFrame)
[docs] @classmethod def from_wkt( cls, data, index=None, crs: Union[Any, None] = None, on_invalid="raise", **kwargs, ) -> "GeoSeries": """ Alternate constructor to create a ``GeoSeries`` from a list or array of WKT objects Parameters ---------- data : array-like, Series Series, list, or array of WKT objects index : array-like or Index The index for the GeoSeries. crs : value, optional Coordinate Reference System of the geometry objects. Can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:4326") or a WKT string. on_invalid : {"raise", "warn", "ignore"}, default "raise" - raise: an exception will be raised if a WKT input geometry is invalid. - warn: a warning will be raised and invalid WKT geometries will be returned as ``None``. - ignore: invalid WKT geometries will be returned as ``None`` without a warning. - fix: an effort is made to fix invalid input geometries (e.g. close unclosed rings). If this is not possible, they are returned as ``None`` without a warning. Requires GEOS >= 3.11 and shapely >= 2.1. kwargs Additional arguments passed to the Series constructor, e.g. ``name``. Returns ------- GeoSeries See Also -------- GeoSeries.from_wkb Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> wkts = [ ... 'POINT (1 1)', ... 'POINT (2 2)', ... 'POINT (3 3)', ... ] >>> s = GeoSeries.from_wkt(wkts) >>> s 0 POINT (1 1) 1 POINT (2 2) 2 POINT (3 3) dtype: geometry """ if on_invalid != "raise": raise NotImplementedError( "GeoSeries.from_wkt(): only on_invalid='raise' is implemented" ) from pyspark.sql.types import StructType, StructField, StringType schema = StructType([StructField("data", StringType(), True)]) return cls._create_from_select( stc.ST_GeomFromText(F.col("data")), data, schema, index, crs, **kwargs, )
# GeoSeries-only (not in GeoDataFrame)
[docs] @classmethod def from_xy(cls, x, y, z=None, index=None, crs=None, **kwargs) -> "GeoSeries": """ Alternate constructor to create a :class:`~geopandas.GeoSeries` of Point geometries from lists or arrays of x, y(, z) coordinates In case of geographic coordinates, it is assumed that longitude is captured by ``x`` coordinates and latitude by ``y``. Parameters ---------- x, y, z : iterable index : array-like or Index, optional The index for the GeoSeries. If not given and all coordinate inputs are Series with an equal index, that index is used. crs : value, optional Coordinate Reference System of the geometry objects. Can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:4326") or a WKT string. **kwargs Additional arguments passed to the Series constructor, e.g. ``name``. Returns ------- GeoSeries See Also -------- GeoSeries.from_wkt points_from_xy Examples -------- >>> x = [2.5, 5, -3.0] >>> y = [0.5, 1, 1.5] >>> s = GeoSeries.from_xy(x, y, crs="EPSG:4326") >>> s 0 POINT (2.5 0.5) 1 POINT (5 1) 2 POINT (-3 1.5) dtype: geometry """ from pyspark.sql.types import StructType, StructField, DoubleType schema = StructType( [StructField("x", DoubleType(), True), StructField("y", DoubleType(), True)] ) # Spark doesn't automatically cast ints to floats for us x = [float(num) for num in x] y = [float(num) for num in y] z = [float(num) for num in z] if z else None if z: data = list(zip(x, y, z)) select = stc.ST_PointZ(F.col("x"), F.col("y"), F.col("z")) schema.add(StructField("z", DoubleType(), True)) else: data = list(zip(x, y)) select = stc.ST_Point(F.col("x"), F.col("y")) geoseries = cls._create_from_select( select, data, schema, index, crs, **kwargs, ) if crs: from pyproj import CRS geoseries.crs = CRS.from_user_input(crs).to_epsg() return geoseries
[docs] @classmethod def from_shapely( cls, data, index=None, crs: Union[Any, None] = None, **kwargs ) -> "GeoSeries": raise NotImplementedError( _not_implemented_error( "from_shapely", "Creates GeoSeries from Shapely geometry objects." ) )
[docs] @classmethod def from_arrow(cls, arr, **kwargs) -> "GeoSeries": """ Construct a GeoSeries from a Arrow array object with a GeoArrow extension type. See https://geoarrow.org/ for details on the GeoArrow specification. This functions accepts any Arrow array object implementing the `Arrow PyCapsule Protocol`_ (i.e. having an ``__arrow_c_array__`` method). .. _Arrow PyCapsule Protocol: https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html Note: Requires geopandas versions >= 1.0.0 to use with Sedona. Parameters ---------- arr : pyarrow.Array, Arrow array Any array object implementing the Arrow PyCapsule Protocol (i.e. has an ``__arrow_c_array__`` or ``__arrow_c_stream__`` method). The type of the array should be one of the geoarrow geometry types. **kwargs Other parameters passed to the GeoSeries constructor. Returns ------- GeoSeries See Also -------- GeoSeries.to_arrow GeoDataFrame.from_arrow Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> import geoarrow.pyarrow as ga >>> array = ga.as_geoarrow([None, "POLYGON ((0 0, 1 1, 0 1, 0 0))", "LINESTRING (0 0, -1 1, 0 -1)"]) >>> geoseries = GeoSeries.from_arrow(array) >>> geoseries 0 None 1 POLYGON ((0 0, 1 1, 0 1, 0 0)) 2 LINESTRING (0 0, -1 1, 0 -1) dtype: geometry """ gpd_series = gpd.GeoSeries.from_arrow(arr, **kwargs) return GeoSeries(gpd_series)
@classmethod def _create_from_select( cls, spark_col: PySparkColumn, data, schema, index, crs, **kwargs ) -> "GeoSeries": from pyspark.pandas.utils import default_session from pyspark.pandas.internal import InternalField import numpy as np if isinstance(data, list) and not isinstance(data[0], (tuple, list)): data = [(obj,) for obj in data] name = kwargs.get("name", SPARK_DEFAULT_SERIES_NAME) if isinstance(data, pspd.Series): spark_df = data._internal.spark_frame assert len(schema) == 1 spark_df = spark_df.withColumnRenamed( _get_series_col_name(data), schema[0].name ) else: spark_df = default_session().createDataFrame(data, schema=schema) spark_df = spark_df.select(spark_col.alias(name)) internal = InternalFrame( spark_frame=spark_df, index_spark_columns=None, column_labels=[(name,)], data_spark_columns=[scol_for(spark_df, name)], data_fields=[InternalField(np.dtype("object"), spark_df.schema[name])], column_label_names=[(name,)], ) ps_series = first_series(PandasOnSparkDataFrame(internal)) name = None if name == SPARK_DEFAULT_SERIES_NAME else name ps_series.rename(name, inplace=True) return GeoSeries(ps_series, index, crs=crs) # ============================================================================ # DATA ACCESS AND MANIPULATION # ============================================================================ # GeoSeries-only (not in GeoDataFrame)
[docs] def isna(self) -> pspd.Series: """ Detect missing values. Returns ------- A boolean Series of the same size as the GeoSeries, True where a value is NA. Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Polygon >>> s = GeoSeries( ... [Polygon([(0, 0), (1, 1), (0, 1)]), None, Polygon([])] ... ) >>> s 0 POLYGON ((0 0, 1 1, 0 1, 0 0)) 1 None 2 POLYGON EMPTY dtype: geometry >>> s.isna() 0 False 1 True 2 False dtype: bool See Also -------- GeoSeries.notna : inverse of isna GeoSeries.is_empty : detect empty geometries """ spark_expr = F.isnull(self.spark.column) result = self._query_geometry_column( spark_expr, returns_geom=False, ) return _to_bool(result)
# GeoSeries-only (not in GeoDataFrame)
[docs] def isnull(self) -> pspd.Series: """Alias for `isna` method. See `isna` for more detail.""" return self.isna()
# GeoSeries-only (not in GeoDataFrame)
[docs] def notna(self) -> pspd.Series: """ Detect non-missing values. Returns ------- A boolean pandas Series of the same size as the GeoSeries, False where a value is NA. Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Polygon >>> s = GeoSeries( ... [Polygon([(0, 0), (1, 1), (0, 1)]), None, Polygon([])] ... ) >>> s 0 POLYGON ((0 0, 1 1, 0 1, 0 0)) 1 None 2 POLYGON EMPTY dtype: geometry >>> s.notna() 0 True 1 False 2 True dtype: bool See Also -------- GeoSeries.isna : inverse of notna GeoSeries.is_empty : detect empty geometries """ # After Sedona's minimum spark version is 3.5.0, we can use F.isnotnull(self.spark.column) instead spark_expr = ~F.isnull(self.spark.column) result = self._query_geometry_column( spark_expr, returns_geom=False, ) return _to_bool(result)
# GeoSeries-only (not in GeoDataFrame)
[docs] def notnull(self) -> pspd.Series: """Alias for `notna` method. See `notna` for more detail.""" return self.notna()
# GeoSeries-only (not in GeoDataFrame)
[docs] def fillna( self, value=None, inplace: bool = False, limit=None, **kwargs ) -> Union["GeoSeries", None]: """ Fill NA values with geometry (or geometries). Parameters ---------- value : shapely geometry or GeoSeries, default None If None is passed, NA values will be filled with GEOMETRYCOLLECTION EMPTY. If a shapely geometry object is passed, it will be used to fill all missing values. If a ``GeoSeries`` is passed, missing values will be filled based on the corresponding index locations. If pd.NA or np.nan are passed, values will be filled with ``None`` (not GEOMETRYCOLLECTION EMPTY). limit : int, default None This is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None. Returns ------- GeoSeries Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Polygon >>> s = GeoSeries( ... [ ... Polygon([(0, 0), (1, 1), (0, 1)]), ... None, ... Polygon([(0, 0), (-1, 1), (0, -1)]), ... ] ... ) >>> s 0 POLYGON ((0 0, 1 1, 0 1, 0 0)) 1 None 2 POLYGON ((0 0, -1 1, 0 -1, 0 0)) dtype: geometry Filled with an empty polygon. >>> s.fillna() 0 POLYGON ((0 0, 1 1, 0 1, 0 0)) 1 GEOMETRYCOLLECTION EMPTY 2 POLYGON ((0 0, -1 1, 0 -1, 0 0)) dtype: geometry Filled with a specific polygon. >>> s.fillna(Polygon([(0, 1), (2, 1), (1, 2)])) 0 POLYGON ((0 0, 1 1, 0 1, 0 0)) 1 POLYGON ((0 1, 2 1, 1 2, 0 1)) 2 POLYGON ((0 0, -1 1, 0 -1, 0 0)) dtype: geometry Filled with another GeoSeries. >>> from shapely.geometry import Point >>> s_fill = GeoSeries( ... [ ... Point(0, 0), ... Point(1, 1), ... Point(2, 2), ... ] ... ) >>> s.fillna(s_fill) 0 POLYGON ((0 0, 1 1, 0 1, 0 0)) 1 POINT (1 1) 2 POLYGON ((0 0, -1 1, 0 -1, 0 0)) dtype: geometry See Also -------- GeoSeries.isna : detect missing values """ from shapely.geometry.base import BaseGeometry # TODO: Implement limit https://github.com/apache/sedona/issues/2068 if limit: raise NotImplementedError( "GeoSeries.fillna() with limit is not implemented yet." ) align = True if pd.isna(value) == True or isinstance(value, BaseGeometry): if ( value is not None and pd.isna(value) == True ): # ie. value is np.nan or pd.NA: value = None else: if value is None: from shapely.geometry import GeometryCollection value = GeometryCollection() other, extended = self._make_series_of_val(value) align = False if extended else align elif isinstance(value, (GeoSeries, gpd.GeoSeries)): if not isinstance(value, GeoSeries): value = GeoSeries(value) # Replace all None's with empty geometries (this is a recursive call) other = value.fillna(None) else: raise ValueError(f"Invalid value type: {type(value)}") # Coalesce: If the value in L is null, use the corresponding value in R for that row spark_expr = F.coalesce(F.col("L"), F.col("R")) result = self._row_wise_operation( spark_expr, other, align=align, returns_geom=True, default_val=None, keep_name=True, ) if inplace: self._update_inplace(result) return None return result
[docs] def explode(self, ignore_index=False, index_parts=False) -> "GeoSeries": raise NotImplementedError( _not_implemented_error( "explode", "Explodes multi-part geometries into separate single-part geometries.", ) )
[docs] def to_crs( self, crs: Union[Any, None] = None, epsg: Union[int, None] = None ) -> "GeoSeries": """Returns a ``GeoSeries`` with all geometries transformed to a new coordinate reference system. Transform all geometries in a GeoSeries to a different coordinate reference system. The ``crs`` attribute on the current GeoSeries must be set. Either ``crs`` or ``epsg`` may be specified for output. This method will transform all points in all objects. It has no notion of projecting entire geometries. All segments joining points are assumed to be lines in the current projection, not geodesics. Objects crossing the dateline (or other projection boundary) will have undesirable behavior. Parameters ---------- crs : pyproj.CRS, optional if `epsg` is specified The value can be anything accepted by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`, such as an authority string (eg "EPSG:4326") or a WKT string. epsg : int, optional if `crs` is specified EPSG code specifying output projection. Returns ------- GeoSeries Examples -------- >>> from shapely.geometry import Point >>> from sedona.spark.geopandas import GeoSeries >>> geoseries = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)], crs=4326) >>> geoseries.crs <Geographic 2D CRS: EPSG:4326> Name: WGS 84 Axis Info [ellipsoidal]: - Lat[north]: Geodetic latitude (degree) - Lon[east]: Geodetic longitude (degree) Area of Use: - name: World - bounds: (-180.0, -90.0, 180.0, 90.0) Datum: World Geodetic System 1984 - Ellipsoid: WGS 84 - Prime Meridian: Greenwich >>> geoseries = geoseries.to_crs(3857) >>> print(geoseries) 0 POINT (111319.491 111325.143) 1 POINT (222638.982 222684.209) 2 POINT (333958.472 334111.171) dtype: geometry >>> geoseries.crs <Projected CRS: EPSG:3857> Name: WGS 84 / Pseudo-Mercator Axis Info [cartesian]: - X[east]: Easting (metre) - Y[north]: Northing (metre) Area of Use: - name: World - 85°S to 85°N - bounds: (-180.0, -85.06, 180.0, 85.06) Coordinate Operation: - name: Popular Visualisation Pseudo-Mercator - method: Popular Visualisation Pseudo Mercator Datum: World Geodetic System 1984 - Ellipsoid: WGS 84 - Prime Meridian: Greenwich """ from pyproj import CRS if crs is not None: crs = CRS.from_user_input(crs) elif epsg is not None: crs = CRS.from_epsg(epsg) else: raise ValueError("Must pass either crs or epsg.") spark_expr = stf.ST_Transform( self.spark.column, F.lit(f"EPSG:{crs.to_epsg()}"), ) return self._query_geometry_column( spark_expr, keep_name=True, )
@property def bounds(self) -> pspd.DataFrame: selects = [ stf.ST_XMin(self.spark.column).alias("minx"), stf.ST_YMin(self.spark.column).alias("miny"), stf.ST_XMax(self.spark.column).alias("maxx"), stf.ST_YMax(self.spark.column).alias("maxy"), ] df = self._internal.spark_frame sdf = df.select(*selects) internal = InternalFrame( spark_frame=sdf, index_spark_columns=None, column_labels=[("minx",), ("miny",), ("maxx",), ("maxy",)], data_spark_columns=[ scol_for(sdf, "minx"), scol_for(sdf, "miny"), scol_for(sdf, "maxx"), scol_for(sdf, "maxy"), ], column_label_names=None, ) return pspd.DataFrame(internal) @property def total_bounds(self): import numpy as np import warnings from pyspark.sql import functions as F if len(self) == 0: # numpy 'min' cannot handle empty arrays # TODO with numpy >= 1.15, the 'initial' argument can be used return np.array([np.nan, np.nan, np.nan, np.nan]) ps_df = self.bounds with warnings.catch_warnings(): # if all rows are empty geometry / none, nan is expected warnings.filterwarnings( "ignore", r"All-NaN slice encountered", RuntimeWarning ) total_bounds_df = ps_df.agg( { "minx": ["min"], "miny": ["min"], "maxx": ["max"], "maxy": ["max"], } ) return np.array( ( np.nanmin(total_bounds_df["minx"]["min"]), # minx np.nanmin(total_bounds_df["miny"]["min"]), # miny np.nanmax(total_bounds_df["maxx"]["max"]), # maxx np.nanmax(total_bounds_df["maxy"]["max"]), # maxy ) ) # GeoSeries-only (not in GeoDataFrame)
[docs] def estimate_utm_crs(self, datum_name: str = "WGS 84") -> "CRS": """Returns the estimated UTM CRS based on the bounds of the dataset. Parameters ---------- datum_name : str, optional The name of the datum to use in the query. Default is WGS 84. Returns ------- pyproj.CRS Examples -------- >>> import geodatasets # requires: pip install geodatasets >>> import geopandas as gpd >>> df = gpd.read_file( ... geodatasets.get_path("geoda.chicago_commpop") ... ) >>> df.geometry.values.estimate_utm_crs() # doctest: +SKIP <Derived Projected CRS: EPSG:32616> Name: WGS 84 / UTM zone 16N Axis Info [cartesian]: - E[east]: Easting (metre) - N[north]: Northing (metre) Area of Use: - name: Between 90°W and 84°W, northern hemisphere between equator and 84°N,... - bounds: (-90.0, 0.0, -84.0, 84.0) Coordinate Operation: - name: UTM zone 16N - method: Transverse Mercator Datum: World Geodetic System 1984 ensemble - Ellipsoid: WGS 84 - Prime Meridian: Greenwich """ import numpy as np from pyproj import CRS from pyproj.aoi import AreaOfInterest from pyproj.database import query_utm_crs_info # This implementation replicates the implementation in geopandas's implementation exactly. # https://github.com/geopandas/geopandas/blob/main/geopandas/array.py # The only difference is that we use Sedona's total_bounds property which is more efficient and scalable # than the geopandas implementation. The rest of the implementation always executes on 4 points (minx, miny, maxx, maxy), # so the numpy and pyproj implementations are reasonable. if not self.crs: raise RuntimeError("crs must be set to estimate UTM CRS.") minx, miny, maxx, maxy = self.total_bounds if self.crs.is_geographic: x_center = np.mean([minx, maxx]) y_center = np.mean([miny, maxy]) # ensure using geographic coordinates else: from pyproj import Transformer from functools import lru_cache TransformerFromCRS = lru_cache(Transformer.from_crs) transformer = TransformerFromCRS(self.crs, "EPSG:4326", always_xy=True) minx, miny, maxx, maxy = transformer.transform_bounds( minx, miny, maxx, maxy ) y_center = np.mean([miny, maxy]) # crossed the antimeridian if minx > maxx: # shift maxx from [-180,180] to [0,360] # so both numbers are positive for center calculation # Example: -175 to 185 maxx += 360 x_center = np.mean([minx, maxx]) # shift back to [-180,180] x_center = ((x_center + 180) % 360) - 180 else: x_center = np.mean([minx, maxx]) utm_crs_list = query_utm_crs_info( datum_name=datum_name, area_of_interest=AreaOfInterest( west_lon_degree=x_center, south_lat_degree=y_center, east_lon_degree=x_center, north_lat_degree=y_center, ), ) try: return CRS.from_epsg(utm_crs_list[0].code) except IndexError: raise RuntimeError("Unable to determine UTM CRS")
[docs] def to_json( self, show_bbox: bool = True, drop_id: bool = False, to_wgs84: bool = False, **kwargs, ) -> str: """ Returns a GeoJSON string representation of the GeoSeries. Parameters ---------- show_bbox : bool, optional, default: True Include bbox (bounds) in the geojson drop_id : bool, default: False Whether to retain the index of the GeoSeries as the id property in the generated GeoJSON. Default is False, but may want True if the index is just arbitrary row numbers. to_wgs84: bool, optional, default: False If the CRS is set on the active geometry column it is exported as WGS84 (EPSG:4326) to meet the `2016 GeoJSON specification <https://tools.ietf.org/html/rfc7946>`_. Set to True to force re-projection and set to False to ignore CRS. False by default. *kwargs* that will be passed to json.dumps(). Note: Unlike geopandas, Sedona's implementation will replace 'LinearRing' with 'LineString' in the GeoJSON output. Returns ------- JSON string Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Point >>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)]) >>> s 0 POINT (1 1) 1 POINT (2 2) 2 POINT (3 3) dtype: geometry >>> s.to_json() '{"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", "pr\ operties": {}, "geometry": {"type": "Point", "coordinates": [1.0, 1.0]}, "bbox": [1.0,\ 1.0, 1.0, 1.0]}, {"id": "1", "type": "Feature", "properties": {}, "geometry": {"type"\ : "Point", "coordinates": [2.0, 2.0]}, "bbox": [2.0, 2.0, 2.0, 2.0]}, {"id": "2", "typ\ e": "Feature", "properties": {}, "geometry": {"type": "Point", "coordinates": [3.0, 3.\ 0]}, "bbox": [3.0, 3.0, 3.0, 3.0]}], "bbox": [1.0, 1.0, 3.0, 3.0]}' See Also -------- GeoSeries.to_file : write GeoSeries to file """ return self.to_geoframe(name="geometry").to_json( na="null", show_bbox=show_bbox, drop_id=drop_id, to_wgs84=to_wgs84, **kwargs )
[docs] def to_wkb(self, hex: bool = False, **kwargs) -> pspd.Series: """ Convert GeoSeries geometries to WKB Parameters ---------- hex : bool If true, export the WKB as a hexadecimal string. The default is to return a binary bytes object. kwargs Additional keyword args will be passed to :func:`shapely.to_wkb`. Returns ------- Series WKB representations of the geometries See also -------- GeoSeries.to_wkt Examples -------- >>> from shapely.geometry import Point, Polygon >>> s = GeoSeries( ... [ ... Point(0, 0), ... Polygon(), ... Polygon([(0, 0), (1, 1), (1, 0)]), ... None, ... ] ... ) >>> s.to_wkb() 0 b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00... 1 b'\x01\x03\x00\x00\x00\x00\x00\x00\x00' 2 b'\x01\x03\x00\x00\x00\x01\x00\x00\x00\x04\x00... 3 None dtype: object >>> s.to_wkb(hex=True) 0 010100000000000000000000000000000000000000 1 010300000000000000 2 0103000000010000000400000000000000000000000000... 3 None dtype: object """ spark_expr = stf.ST_AsBinary(self.spark.column) if hex: spark_expr = F.hex(spark_expr) return self._query_geometry_column( spark_expr, returns_geom=False, )
[docs] def to_wkt(self, **kwargs) -> pspd.Series: """ Convert GeoSeries geometries to WKT Note: Using shapely < 1.0.0 may return different geometries for empty geometries. Parameters ---------- kwargs Keyword args will be passed to :func:`shapely.to_wkt`. Returns ------- Series WKT representations of the geometries Examples -------- >>> from shapely.geometry import Point >>> s = GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)]) >>> s 0 POINT (1 1) 1 POINT (2 2) 2 POINT (3 3) dtype: geometry >>> s.to_wkt() 0 POINT (1 1) 1 POINT (2 2) 2 POINT (3 3) dtype: object See also -------- GeoSeries.to_wkb """ spark_expr = stf.ST_AsText(self.spark.column) return self._query_geometry_column( spark_expr, returns_geom=False, )
[docs] def to_arrow(self, geometry_encoding="WKB", interleaved=True, include_z=None): """Encode a GeoSeries to GeoArrow format. See https://geoarrow.org/ for details on the GeoArrow specification. This functions returns a generic Arrow array object implementing the `Arrow PyCapsule Protocol`_ (i.e. having an ``__arrow_c_array__`` method). This object can then be consumed by your Arrow implementation of choice that supports this protocol. .. _Arrow PyCapsule Protocol: https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html Note: Requires geopandas versions >= 1.0.0 to use with Sedona. Parameters ---------- geometry_encoding : {'WKB', 'geoarrow' }, default 'WKB' The GeoArrow encoding to use for the data conversion. interleaved : bool, default True Only relevant for 'geoarrow' encoding. If True, the geometries' coordinates are interleaved in a single fixed size list array. If False, the coordinates are stored as separate arrays in a struct type. include_z : bool, default None Only relevant for 'geoarrow' encoding (for WKB, the dimensionality of the individual geometries is preserved). If False, return 2D geometries. If True, include the third dimension in the output (if a geometry has no third dimension, the z-coordinates will be NaN). By default, will infer the dimensionality from the input geometries. Note that this inference can be unreliable with empty geometries (for a guaranteed result, it is recommended to specify the keyword). Returns ------- GeoArrowArray A generic Arrow array object with geometry data encoded to GeoArrow. Examples -------- >>> from sedona.spark.geopandas import GeoSeries >>> from shapely.geometry import Point >>> gser = GeoSeries([Point(1, 2), Point(2, 1)]) >>> gser 0 POINT (1 2) 1 POINT (2 1) dtype: geometry >>> arrow_array = gser.to_arrow() >>> arrow_array <geopandas.io._geoarrow.GeoArrowArray object at ...> The returned array object needs to be consumed by a library implementing the Arrow PyCapsule Protocol. For example, wrapping the data as a pyarrow.Array (requires pyarrow >= 14.0): >>> import pyarrow as pa >>> array = pa.array(arrow_array) >>> array <pyarrow.lib.BinaryArray object at ...> [ 0101000000000000000000F03F0000000000000040, 01010000000000000000000040000000000000F03F ] """ # Because this function returns the arrow array in memory, we simply rely on geopandas's implementation. # This also returns a geopandas specific data type, which can be converted to an actual pyarrow array, # so there is no direct Sedona equivalent. This way we also get all of the arguments implemented for free. return self.to_geopandas().to_arrow( geometry_encoding=geometry_encoding, interleaved=interleaved, include_z=include_z, )
[docs] def clip(self, mask, keep_geom_type: bool = False, sort=False) -> "GeoSeries": raise NotImplementedError( _not_implemented_error( "clip", "Clips geometries to the bounds of a mask geometry." ) )
[docs] def to_file( self, path: str, driver: Union[str, None] = None, schema: Union[dict, None] = None, index: Union[bool, None] = None, **kwargs, ): """Write the ``GeoSeries`` to a file. Parameters ---------- path : str File path or file handle to write to. driver : str, optional The format driver used to write the file, by default None. If not specified, it's inferred from the file extension. Available formats are "geojson", "geopackage", and "geoparquet". index : bool, optional If True, writes the index as a column. If False, no index is written. By default None, the index is written only if it is named, is a MultiIndex, or has a non-integer data type. mode : str, default 'w' The write mode: 'w' to overwrite the existing file or 'a' to append. crs : pyproj.CRS, optional The coordinate reference system to write. If None, it is determined from the ``GeoSeries`` `crs` attribute. The value can be anything accepted by :meth:`pyproj.CRS.from_user_input()`, such as an authority string (e.g., "EPSG:4326") or a WKT string. **kwargs Additional keyword arguments passed to the underlying writing engine. Examples -------- >>> from shapely.geometry import Point, LineString >>> from sedona.spark.geopandas import GeoSeries >>> # Note: Examples write to temporary files for demonstration >>> import tempfile >>> import os Create a GeoSeries: >>> gs = GeoSeries( ... [Point(0, 0), LineString([(1, 1), (2, 2)])], ... index=["a", "b"] ... ) Save to a GeoParquet file: >>> path_parquet = os.path.join(tempfile.gettempdir(), "data.parquet") >>> gs.to_file(path_parquet, driver="geoparquet") Append to a GeoJSON file: >>> path_json = os.path.join(tempfile.gettempdir(), "data.json") >>> gs.to_file(path_json, driver="geojson", mode='a') """ self.to_geoframe().to_file(path, driver, index=index, **kwargs)
[docs] def to_parquet(self, path, **kwargs): """Write the GeoSeries to a GeoParquet file. Parameters ---------- path : str The file path where the GeoParquet file will be written. **kwargs Additional keyword arguments passed to the underlying writing function. Returns ------- None Examples -------- >>> from shapely.geometry import Point >>> from sedona.spark.geopandas import GeoSeries >>> import tempfile >>> import os >>> gs = GeoSeries([Point(1, 1), Point(2, 2)]) >>> file_path = os.path.join(tempfile.gettempdir(), "my_geodata.parquet") >>> gs.to_parquet(file_path) """ self.to_geoframe().to_file(path, driver="geoparquet", **kwargs)
# ----------------------------------------------------------------------------- # # Utils # ----------------------------------------------------------------------------- def _update_inplace(self, result: "GeoSeries", invalidate_sindex: bool = True): self.rename(result.name, inplace=True) self._update_anchor(result._anchor) if invalidate_sindex: self._sindex = None def _make_series_of_val(self, value: Any): """ A helper method to turn single objects into series (ps.Series or GeoSeries when possible) Returns: tuple[pspd.Series, bool]: - The series of the value - Whether returned value was a single object extended into a series (useful for row-wise 'align' parameter) """ # generator instead of a in-memory list if isinstance(value, GeoDataFrame): return value.geometry, False elif not isinstance(value, pspd.Series): lst = [value for _ in range(len(self))] if isinstance(value, BaseGeometry): return GeoSeries(lst), True else: # e.g int input return pspd.Series(lst), True else: return value, False
[docs] def to_geoframe(self, name=None): if name is not None: renamed = self.rename(name) elif self._column_label is None or self._column_label == (None,): renamed = self.rename("geometry") else: renamed = self index_col = SPARK_DEFAULT_INDEX_NAME result = GeoDataFrame( pspd.DataFrame(renamed._internal).to_spark(index_col).pandas_api(index_col) ) result.index.name = self.index.name return result
# ----------------------------------------------------------------------------- # # Utils # ----------------------------------------------------------------------------- def _get_series_col_name(ps_series: pspd.Series) -> str: return ps_series.name if ps_series.name else SPARK_DEFAULT_SERIES_NAME def _to_bool(ps_series: pspd.Series, default: bool = False) -> pspd.Series: """ Cast a ps.Series to bool type if it's not one, converting None values to the default value. """ if ps_series.dtype.name != "bool": # fill None values with the default value ps_series.fillna(default, inplace=True) return ps_series