Spatial RDD Applications in Python¶
Introduction¶
Apache Sedona core provides five special SpatialRDDs:
All of them can be imported from sedona.core.SpatialRDD module sedona has written serializers which convert Sedona SpatialRDD to Python objects. Converting will produce GeoData objects which have 2 attributes:
geom attribute holds geometry representation as shapely objects. userData is string representation of other attributes separated by "\t"
GeoData has one method to get user data.
Note
This tutorial is based on Sedona Core Jupyter Notebook example. You can interact with Sedona Python Jupyter notebook immediately on Binder. Click and wait for a few minutes. Then select a notebook and enjoy!
Installation¶
Please read Quick start to install Sedona Python.
Apache Sedona Serializers¶
Sedona has a suite of well-written geometry and index serializers. Forgetting to enable these serializers will lead to high memory consumption.
conf.set("spark.serializer", KryoSerializer.getName)
conf.set("spark.kryo.registrator", SedonaKryoRegistrator.getName)
sc = SparkContext(conf=conf)
Create a SpatialRDD¶
Create a typed SpatialRDD¶
Apache Sedona core provides three special SpatialRDDs:
They can be loaded from CSV, TSV, WKT, WKB, Shapefiles, GeoJSON formats. To pass the format to SpatialRDD constructor please use FileDataSplitter enumeration.
sedona SpatialRDDs (and other classes when it was necessary) have implemented meta classes which allow to use overloaded functions how Scala/Java Apache Sedona API allows. ex.
from pyspark import StorageLevel
from sedona.core.SpatialRDD import PointRDD
from sedona.core.enums import FileDataSplitter
input_location = "checkin.csv"
offset = 0 # The point long/lat starts from Column 0
splitter = FileDataSplitter.CSV # FileDataSplitter enumeration
carry_other_attributes = True # Carry Column 2 (hotel, gas, bar...)
level = StorageLevel.MEMORY_ONLY # Storage level from pyspark
s_epsg = "epsg:4326" # Source epsg code
t_epsg = "epsg:5070" # target epsg code
point_rdd = PointRDD(sc, input_location, offset, splitter, carry_other_attributes)
point_rdd = PointRDD(sc, input_location, splitter, carry_other_attributes, level, s_epsg, t_epsg)
point_rdd = PointRDD(
sparkContext=sc,
InputLocation=input_location,
Offset=offset,
splitter=splitter,
carryInputData=carry_other_attributes
)
From SparkSQL DataFrame
To create spatialRDD from other formats you can use adapter between Spark DataFrame and SpatialRDD
csv_point_input_location= "/tests/resources/county_small.tsv"
df = spark.read.\
format("csv").\
option("delimiter", "\t").\
option("header", "false").\
load(csv_point_input_location)
df.createOrReplaceTempView("counties")
spatial_df = spark.sql(
"""
SELECT ST_GeomFromWKT(_c0) as geom, _c6 as county_name
FROM counties
"""
)
spatial_df.printSchema()
root
|-- geom: geometry (nullable = false)
|-- county_name: string (nullable = true)
Note that, you have to name your column geometry
from sedona.utils.adapter import Adapter
spatial_rdd = Adapter.toSpatialRdd(spatial_df)
spatial_rdd.analyze()
spatial_rdd.boundaryEnvelope
<sedona.core.geom_types.Envelope object at 0x7f1e5f29fe10>
or pass Geometry column name as a second argument
spatial_rdd = Adapter.toSpatialRdd(spatial_df, "geom")
For WKT/WKB/GeoJSON data, please use ST_GeomFromWKT / ST_GeomFromWKB / ST_GeomFromGeoJSON instead.
Read other attributes in an SpatialRDD¶
Each SpatialRDD can carry non-spatial attributes such as price, age and name as long as the user sets carryOtherAttributes as TRUE.
The other attributes are combined together to a string and stored in UserData field of each geometry.
To retrieve the UserData field, use the following code:
rdd_with_other_attributes = object_rdd.rawSpatialRDD.map(lambda x: x.getUserData())
Write a Spatial Range Query¶
from sedona.core.geom.envelope import Envelope
from sedona.core.spatialOperator import RangeQuery
range_query_window = Envelope(-90.01, -80.01, 30.01, 40.01)
consider_boundary_intersection = False ## Only return gemeotries fully covered by the window
using_index = False
query_result = RangeQuery.SpatialRangeQuery(spatial_rdd, range_query_window, consider_boundary_intersection, using_index)
Note
Please use RangeQueryRaw from the same module if you want to avoid jvm python serde while converting to Spatial DataFrame It takes the same parameters as RangeQuery but returns reference to jvm rdd which can be converted to dataframe without python - jvm serde using Adapter.
Example:
from sedona.core.geom.envelope import Envelope
from sedona.core.spatialOperator import RangeQueryRaw
from sedona.utils.adapter import Adapter
range_query_window = Envelope(-90.01, -80.01, 30.01, 40.01)
consider_boundary_intersection = False ## Only return gemeotries fully covered by the window
using_index = False
query_result = RangeQueryRaw.SpatialRangeQuery(spatial_rdd, range_query_window, consider_boundary_intersection, using_index)
gdf = Adapter.toDf(query_result, spark, ["col1", ..., "coln"])
Range query window¶
Besides the rectangle (Envelope) type range query window, Apache Sedona range query window can be
To create shapely geometries please follow Shapely official docs
Use spatial indexes¶
Sedona provides two types of spatial indexes,
To utilize a spatial index in a spatial range query, use the following code:
from sedona.core.geom.envelope import Envelope
from sedona.core.enums import IndexType
from sedona.core.spatialOperator import RangeQuery
range_query_window = Envelope(-90.01, -80.01, 30.01, 40.01)
consider_boundary_intersection = False ## Only return gemeotries fully covered by the window
build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
spatial_rdd.buildIndex(IndexType.QUADTREE, build_on_spatial_partitioned_rdd)
using_index = True
query_result = RangeQuery.SpatialRangeQuery(
spatial_rdd,
range_query_window,
consider_boundary_intersection,
using_index
)
Output format¶
The output format of the spatial range query is another RDD which consists of GeoData objects.
SpatialRangeQuery result can be used as RDD with map or other spark RDD funtions. Also it can be used as Python objects when using collect method. Example:
query_result.map(lambda x: x.geom.length).collect()
[
1.5900840000000045,
1.5906639999999896,
1.1110299999999995,
1.1096700000000084,
1.1415619999999933,
1.1386399999999952,
1.1415619999999933,
1.1418860000000137,
1.1392780000000045,
...
]
Or transformed to GeoPandas GeoDataFrame
import geopandas as gpd
gpd.GeoDataFrame(
query_result.map(lambda x: [x.geom, x.userData]).collect(),
columns=["geom", "user_data"],
geometry="geom"
)
Write a Spatial KNN Query¶
A spatial K Nearnest Neighbor query takes as input a K, a query point and an SpatialRDD and finds the K geometries in the RDD which are the closest to he query point.
Assume you now have an SpatialRDD (typed or generic). You can use the following code to issue an Spatial KNN Query on it.
from sedona.core.spatialOperator import KNNQuery
from shapely.geometry import Point
point = Point(-84.01, 34.01)
k = 1000 ## K Nearest Neighbors
using_index = False
result = KNNQuery.SpatialKnnQuery(object_rdd, point, k, using_index)
Query center geometry¶
Besides the Point type, Apache Sedona KNN query center can be
To create Polygon or Linestring object please follow Shapely official docs
Use spatial indexes¶
To utilize a spatial index in a spatial KNN query, use the following code:
from sedona.core.spatialOperator import KNNQuery
from sedona.core.enums import IndexType
from shapely.geometry import Point
point = Point(-84.01, 34.01)
k = 5 ## K Nearest Neighbors
build_on_spatial_partitioned_rdd = False ## Set to TRUE only if run join query
spatial_rdd.buildIndex(IndexType.RTREE, build_on_spatial_partitioned_rdd)
using_index = True
result = KNNQuery.SpatialKnnQuery(spatial_rdd, point, k, using_index)
Warning
Only R-Tree index supports Spatial KNN query
Output format¶
The output format of the spatial KNN query is a list of GeoData objects. The list has K GeoData objects.
Example:
>> result
[GeoData, GeoData, GeoData, GeoData, GeoData]
Write a Spatial Join Query¶
A spatial join query takes as input two Spatial RDD A and B. For each geometry in A, finds the geometries (from B) covered/intersected by it. A and B can be any geometry type and are not necessary to have the same geometry type.
Assume you now have two SpatialRDDs (typed or generic). You can use the following code to issue an Spatial Join Query on them.
from sedona.core.enums import GridType
from sedona.core.spatialOperator import JoinQuery
consider_boundary_intersection = False ## Only return geometries fully covered by each query window in queryWindowRDD
using_index = False
object_rdd.analyze()
object_rdd.spatialPartitioning(GridType.KDBTREE)
query_window_rdd.spatialPartitioning(object_rdd.getPartitioner())
result = JoinQuery.SpatialJoinQuery(object_rdd, query_window_rdd, using_index, consider_boundary_intersection)
Result of SpatialJoinQuery is RDD which consists of GeoData instance and list of GeoData instances which spatially intersects or are covered by GeoData.
result.collect())
[
[GeoData, [GeoData, GeoData, GeoData, GeoData]],
[GeoData, [GeoData, GeoData, GeoData]],
[GeoData, [GeoData]],
[GeoData, [GeoData, GeoData]],
...
[GeoData, [GeoData, GeoData]]
]
Use spatial partitioning¶
Apache Sedona spatial partitioning method can significantly speed up the join query. Three spatial partitioning methods are available: KDB-Tree, Quad-Tree and R-Tree. Two SpatialRDD must be partitioned by the same way.
If you first partition SpatialRDD A, then you must use the partitioner of A to partition B.
object_rdd.spatialPartitioning(GridType.KDBTREE)
query_window_rdd.spatialPartitioning(object_rdd.getPartitioner())
Or
query_window_rdd.spatialPartitioning(GridType.KDBTREE)
object_rdd.spatialPartitioning(query_window_rdd.getPartitioner())
Use spatial indexes¶
To utilize a spatial index in a spatial join query, use the following code:
from sedona.core.enums import GridType
from sedona.core.enums import IndexType
from sedona.core.spatialOperator import JoinQuery
object_rdd.spatialPartitioning(GridType.KDBTREE)
query_window_rdd.spatialPartitioning(object_rdd.getPartitioner())
build_on_spatial_partitioned_rdd = True ## Set to TRUE only if run join query
using_index = True
query_window_rdd.buildIndex(IndexType.QUADTREE, build_on_spatial_partitioned_rdd)
result = JoinQuery.SpatialJoinQueryFlat(object_rdd, query_window_rdd, using_index, True)
The index should be built on either one of two SpatialRDDs. In general, you should build it on the larger SpatialRDD.
Output format¶
The output format of the spatial join query is a PairRDD. In this PairRDD, each object is a pair of two GeoData objects. The left one is the GeoData from object_rdd and the right one is the GeoData from the query_window_rdd.
Point,Polygon
Point,Polygon
Point,Polygon
Polygon,Polygon
LineString,LineString
Polygon,LineString
...
example
result.collect()
[
[GeoData, GeoData],
[GeoData, GeoData],
[GeoData, GeoData],
[GeoData, GeoData],
...
[GeoData, GeoData],
[GeoData, GeoData]
]
Each object on the left is covered/intersected by the object on the right.
Write a Distance Join Query¶
!!!warning RDD distance joins are only reliable for points. For other geometry types, please use Spatial SQL.
A distance join query takes two spatial RDD assuming that we have two SpatialRDD's:
And finds the geometries (from spatial_rdd) are within given distance to it. spatial_rdd and object_rdd can be any geometry type (point, line, polygon) and are not necessary to have the same geometry type
You can use the following code to issue an Distance Join Query on them.
from sedona.core.SpatialRDD import CircleRDD
from sedona.core.enums import GridType
from sedona.core.spatialOperator import JoinQuery
object_rdd.analyze()
circle_rdd = CircleRDD(object_rdd, 0.1) ## Create a CircleRDD using the given distance
circle_rdd.analyze()
circle_rdd.spatialPartitioning(GridType.KDBTREE)
spatial_rdd.spatialPartitioning(circle_rdd.getPartitioner())
consider_boundary_intersection = False ## Only return gemeotries fully covered by each query window in queryWindowRDD
using_index = False
result = JoinQuery.DistanceJoinQueryFlat(spatial_rdd, circle_rdd, using_index, consider_boundary_intersection)
Note
Please use JoinQueryRaw from the same module for methods
-
spatialJoin
-
DistanceJoinQueryFlat
-
SpatialJoinQueryFlat
For better performance while converting to dataframe with adapter. That approach allows to avoid costly serialization between Python and jvm and in result operating on python object instead of native geometries.
Example:
from sedona.core.SpatialRDD import CircleRDD
from sedona.core.enums import GridType
from sedona.core.spatialOperator import JoinQueryRaw
object_rdd.analyze()
circle_rdd = CircleRDD(object_rdd, 0.1) ## Create a CircleRDD using the given distance
circle_rdd.analyze()
circle_rdd.spatialPartitioning(GridType.KDBTREE)
spatial_rdd.spatialPartitioning(circle_rdd.getPartitioner())
consider_boundary_intersection = False ## Only return gemeotries fully covered by each query window in queryWindowRDD
using_index = False
result = JoinQueryRaw.DistanceJoinQueryFlat(spatial_rdd, circle_rdd, using_index, consider_boundary_intersection)
gdf = Adapter.toDf(result, ["left_col1", ..., "lefcoln"], ["rightcol1", ..., "rightcol2"], spark)
Output format¶
Result for this query is RDD which holds two GeoData objects within list of lists. Example:
result.collect()
[[GeoData, GeoData], [GeoData, GeoData] ...]
It is possible to do some RDD operation on result data ex. Getting polygon centroid.
result.map(lambda x: x[0].geom.centroid).collect()
[
<shapely.geometry.point.Point at 0x7efee2d28128>,
<shapely.geometry.point.Point at 0x7efee2d280b8>,
<shapely.geometry.point.Point at 0x7efee2d28fd0>,
<shapely.geometry.point.Point at 0x7efee2d28080>,
...
]
Save to permanent storage¶
You can always save an SpatialRDD back to some permanent storage such as HDFS and Amazon S3. You can save distributed SpatialRDD to WKT, GeoJSON and object files.
Note
Non-spatial attributes such as price, age and name will also be stored to permanent storage.
Save an SpatialRDD (not indexed)¶
Typed SpatialRDD and generic SpatialRDD can be saved to permanent storage.
Save to distributed WKT text file
Use the following code to save an SpatialRDD as a distributed WKT text file:
object_rdd.rawSpatialRDD.saveAsTextFile("hdfs://PATH")
object_rdd.saveAsWKT("hdfs://PATH")
Save to distributed WKB text file
Use the following code to save an SpatialRDD as a distributed WKB text file:
object_rdd.saveAsWKB("hdfs://PATH")
Save to distributed GeoJSON text file
Use the following code to save an SpatialRDD as a distributed GeoJSON text file:
object_rdd.saveAsGeoJSON("hdfs://PATH")
Save to distributed object file
Use the following code to save an SpatialRDD as a distributed object file:
object_rdd.rawJvmSpatialRDD.saveAsObjectFile("hdfs://PATH")
Note
Each object in a distributed object file is a byte array (not human-readable). This byte array is the serialized format of a Geometry or a SpatialIndex.
Save an SpatialRDD (indexed)¶
Indexed typed SpatialRDD and generic SpatialRDD can be saved to permanent storage. However, the indexed SpatialRDD has to be stored as a distributed object file.
Save to distributed object file
Use the following code to save an SpatialRDD as a distributed object file:
object_rdd.indexedRawRDD.saveAsObjectFile("hdfs://PATH")
Save an SpatialRDD (spatialPartitioned W/O indexed)¶
A spatial partitioned RDD can be saved to permanent storage but Spark is not able to maintain the same RDD partition Id of the original RDD. This will lead to wrong join query results. We are working on some solutions. Stay tuned!
Reload a saved SpatialRDD¶
You can easily reload an SpatialRDD that has been saved to a distributed object file.
Load to a typed SpatialRDD
Use the following code to reload the PointRDD/PolygonRDD/LineStringRDD:
from sedona.core.formatMapper.disc_utils import load_spatial_rdd_from_disc, GeoType
polygon_rdd = load_spatial_rdd_from_disc(sc, "hdfs://PATH", GeoType.POLYGON)
point_rdd = load_spatial_rdd_from_disc(sc, "hdfs://PATH", GeoType.POINT)
linestring_rdd = load_spatial_rdd_from_disc(sc, "hdfs://PATH", GeoType.LINESTRING)
Load to a generic SpatialRDD
Use the following code to reload the SpatialRDD:
saved_rdd = load_spatial_rdd_from_disc(sc, "hdfs://PATH", GeoType.GEOMETRY)
Use the following code to reload the indexed SpatialRDD:
saved_rdd = SpatialRDD()
saved_rdd.indexedRawRDD = load_spatial_index_rdd_from_disc(sc, "hdfs://PATH")
Read from other Geometry files¶
All below methods will return SpatialRDD object which can be used with Spatial functions such as Spatial Join etc.
Read from WKT file¶
from sedona.core.formatMapper import WktReader
WktReader.readToGeometryRDD(sc, wkt_geometries_location, 0, True, False)
<sedona.core.SpatialRDD.spatial_rdd.SpatialRDD at 0x7f8fd2fbf250>
Read from WKB file¶
from sedona.core.formatMapper import WkbReader
WkbReader.readToGeometryRDD(sc, wkb_geometries_location, 0, True, False)
<sedona.core.SpatialRDD.spatial_rdd.SpatialRDD at 0x7f8fd2eece50>
Read from GeoJson file¶
from sedona.core.formatMapper import GeoJsonReader
GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location)
<sedona.core.SpatialRDD.spatial_rdd.SpatialRDD at 0x7f8fd2eecb90>
Read from Shapefile¶
from sedona.core.formatMapper.shapefileParser import ShapefileReader
ShapefileReader.readToGeometryRDD(sc, shape_file_location)
<sedona.core.SpatialRDD.spatial_rdd.SpatialRDD at 0x7f8fd2ee0710>
Tips¶
When you use Sedona functions such as
-
JoinQuery.spatialJoin
-
JoinQuery.DistanceJoinQueryFlat
-
JoinQuery.SpatialJoinQueryFlat
-
RangeQuery.SpatialRangeQuery
For better performance when converting to dataframe you can use JoinQueryRaw and RangeQueryRaw from the same module and adapter to convert to Spatial DataFrame.
Example, JoinQueryRaw:
from sedona.core.SpatialRDD import CircleRDD
from sedona.core.enums import GridType
from sedona.core.spatialOperator import JoinQueryRaw
object_rdd.analyze()
circle_rdd = CircleRDD(object_rdd, 0.1) ## Create a CircleRDD using the given distance
circle_rdd.analyze()
circle_rdd.spatialPartitioning(GridType.KDBTREE)
spatial_rdd.spatialPartitioning(circle_rdd.getPartitioner())
consider_boundary_intersection = False ## Only return gemeotries fully covered by each query window in queryWindowRDD
using_index = False
result = JoinQueryRaw.DistanceJoinQueryFlat(spatial_rdd, circle_rdd, using_index, consider_boundary_intersection)
gdf = Adapter.toDf(result, ["left_col1", ..., "lefcoln"], ["rightcol1", ..., "rightcol2"], spark)
and RangeQueryRaw
from sedona.core.geom.envelope import Envelope
from sedona.core.spatialOperator import RangeQueryRaw
from sedona.utils.adapter import Adapter
range_query_window = Envelope(-90.01, -80.01, 30.01, 40.01)
consider_boundary_intersection = False ## Only return gemeotries fully covered by the window
using_index = False
query_result = RangeQueryRaw.SpatialRangeQuery(spatial_rdd, range_query_window, consider_boundary_intersection, using_index)
gdf = Adapter.toDf(query_result, spark, ["col1", ..., "coln"])