# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from typing import Union
import warnings
import pyspark.pandas as ps
from sedona.spark.geopandas import GeoDataFrame
from pyspark.pandas.utils import default_session, scol_for
from pyspark.pandas.internal import SPARK_DEFAULT_INDEX_NAME, NATURAL_ORDER_COLUMN_NAME
from pyspark.pandas.frame import InternalFrame
from pyspark.pandas.utils import validate_mode, log_advice
from pandas.api.types import is_integer_dtype
def _to_file(
df: GeoDataFrame,
path: str,
driver: Union[str, None] = None,
index: Union[bool, None] = True,
**kwargs,
):
"""
Write the ``GeoDataFrame`` to a file.
Parameters
----------
path : string
File path or file handle to write to.
driver : string, default None
The format driver used to write the file.
If not specified, it attempts to infer it from the file extension.
If no extension is specified, Sedona will error.
Options:
- "geojson"
- "geopackage"
- "geoparquet"
schema : dict, default None
Not applicable to Sedona's implementation
index : bool, default None
If True, write index into one or more columns (for MultiIndex).
Default None writes the index into one or more columns only if
the index is named, is a MultiIndex, or has a non-integer data
type. If False, no index is written.
mode : string, default 'w'
The write mode, 'w' to overwrite the existing file and 'a' to append.
'overwrite' and 'append' are equivalent to 'w' and 'a' respectively.
crs : pyproj.CRS, default None
If specified, the CRS is passed to Fiona to
better control how the file is written. If None, GeoPandas
will determine the crs based on crs df attribute.
The value can be anything accepted
by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
engine : str
Not applicable to Sedona's implementation
metadata : dict[str, str], default None
Optional metadata to be stored in the file. Keys and values must be
strings. Supported only for "GPKG" driver. Not supported by Sedona
**kwargs :
Keyword args to be passed to the engine, and can be used to write
to multi-layer data, store data within archives (zip files), etc.
In case of the "pyogrio" engine, the keyword arguments are passed to
`pyogrio.write_dataframe`. In case of the "fiona" engine, the keyword
arguments are passed to fiona.open`. For more information on possible
keywords, type: ``import pyogrio; help(pyogrio.write_dataframe)``.
Examples
--------
>>> gdf = GeoDataFrame({"geometry": [Point(0, 0), LineString([(0, 0), (1, 1)])], "int": [1, 2]}
>>> gdf.to_file(filepath, format="geoparquet")
With selected drivers you can also append to a file with `mode="a"`:
>>> gdf.to_file(gdf, driver="geojson", mode="a")
When the index is of non-integer dtype, index=None (default) is treated as True, writing the index to the file.
>>> gdf = GeoDataFrame({"geometry": [Point(0, 0)]}, index=["a", "b"])
>>> gdf.to_file(gdf, driver="geoparquet")
"""
ext_to_driver = {
".parquet": "Parquet",
".json": "GeoJSON",
".geojson": "GeoJSON",
}
# auto detect driver from filename if not provided
if driver is None:
_, extension = os.path.splitext(path)
if extension not in ext_to_driver:
raise ValueError(f"Unsupported file extension: {extension}")
driver = ext_to_driver[extension]
spark_fmt = driver.lower()
crs = kwargs.pop("crs", None)
if crs:
from pyproj import CRS
crs = CRS.from_user_input(crs)
spark_df = df._internal.spark_frame.drop(NATURAL_ORDER_COLUMN_NAME)
if index is None:
# Determine if index attribute(s) should be saved to file
# (only if they are named or are non-integer)
index = list(df.index.names) != [None] or not is_integer_dtype(df.index.dtype)
if not index:
log_advice(
"If index is not True is not specified for `to_file`, "
"the existing index is lost when writing to a file."
)
spark_df = spark_df.drop(SPARK_DEFAULT_INDEX_NAME)
if spark_fmt == "geoparquet":
writer = spark_df.write.format("geoparquet")
elif spark_fmt == "geojson":
writer = spark_df.write.format("geojson")
else:
raise ValueError(f"Unsupported spark format: {spark_fmt}")
default_mode = "overwrite"
mode = validate_mode(kwargs.pop("mode", default_mode))
writer.mode(mode).save(path, **kwargs)
[docs]
def read_file(filename: str, format: Union[str, None] = None, **kwargs):
"""
Alternate constructor to create a ``GeoDataFrame`` from a file.
Parameters
----------
filename : str
File path or file handle to read from. If the path is a directory,
Sedona will read all files in the directory into a dataframe.
format : str, default None
The format of the file to read. If None, Sedona will infer the format
from the file extension. Note, inferring the format from the file extension
is not supported for directories.
Options:
- "shapefile"
- "geojson"
- "geopackage"
- "geoparquet"
See also
--------
GeoDataFrame.to_file : write GeoDataFrame to file
"""
# We warn the user if they try to use arguments that geopandas supports but not Sedona
if kwargs:
warnings.warn(f"The given arguments are not supported in Sedona: {kwargs}")
spark = default_session()
# If format is not specified, infer it from the file extension
if format is None:
if os.path.isdir(filename):
raise ValueError(
f"Inferring the format from the file extension is not supported for directories: {filename}"
)
if filename.lower().endswith(".shp"):
format = "shapefile"
elif filename.lower().endswith(".json"):
format = "geojson"
elif filename.lower().endswith(".parquet"):
format = "geoparquet"
elif filename.lower().endswith(".gpkg"):
format = "geopackage"
else:
raise ValueError(f"Unsupported file type: {filename}")
else:
format = format.lower()
if format == "shapefile":
sdf = spark.read.format("shapefile").load(filename)
return GeoDataFrame(sdf)
elif format == "geojson":
sdf = (
spark.read.format("geojson")
.option("multiLine", "true")
.load(filename)
.select(
"geometry", f"properties.*"
) # select all non-geometry columns (which are under properties)
)
# geojson also has a 'type' field, but we ignore it
elif format == "geopackage":
table_name = kwargs.get("table_name", None)
if not table_name:
raise ValueError("table_name is required for geopackage")
sdf = (
spark.read.format("geopackage")
.option("tableName", table_name)
.load(filename)
)
elif format == "geoparquet":
sdf = spark.read.format("geoparquet").load(filename)
else:
raise NotImplementedError(f"Unsupported file type: {filename}")
index_spark_columns = []
# If index was retained, we sort by it so the dataframe has the same order as the original one
if SPARK_DEFAULT_INDEX_NAME in sdf.columns:
sdf = sdf.orderBy(SPARK_DEFAULT_INDEX_NAME)
index_spark_columns = [scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)]
internal = InternalFrame(spark_frame=sdf, index_spark_columns=index_spark_columns)
return GeoDataFrame(ps.DataFrame(internal))
[docs]
def read_parquet(
path,
columns=None,
storage_options=None,
bbox=None,
to_pandas_kwargs=None,
**kwargs,
):
"""
Load a Parquet object from the file path, returning a GeoDataFrame.
* if no geometry columns are read, this will raise a ``ValueError`` - you
should use the pandas `read_parquet` method instead.
If 'crs' key is not present in the GeoParquet metadata associated with the
Parquet object, it will default to "OGC:CRS84" according to the specification.
Parameters
----------
path : str, path object
columns : list-like of strings, default=None
Not currently supported in Sedona
storage_options : dict, optional
Not currently supported in Sedona
bbox : tuple, optional
Not currently supported in Sedona
to_pandas_kwargs : dict, optional
Not currently supported in Sedona
Returns
-------
GeoDataFrame
Examples
--------
from sedona.spark.geopandas import read_parquet
>>> df = read_parquet("data.parquet") # doctest: +SKIP
Specifying columns to read:
>>> df = read_parquet(
... "data.parquet",
... ) # doctest: +SKIP
"""
if kwargs:
warnings.warn(f"The given arguments are not supported in Sedona: {kwargs}")
return read_file(path, format="geoparquet", **kwargs)