Spatial SQL Application in Python¶
Introduction¶
GeoSPark provides a Python wrapper for its Spatial SQL / DataFrame interface. The official repository for GeoSpark can be found at https://github.com/DataSystemsLab/GeoSpark.
This package allows users to use all GeoSparkSQL functions and transform it to Python Shapely geometry objects. Also it allows to create Spark DataFrame with GeoSpark UDT from Shapely geometry objects. Spark DataFrame can be converted to GeoPandas easily, in addition all fiona drivers for shape file are available to load data from files and convert them to Spark DataFrame. Please look at examples.
Installation¶
GeoSpark extends pyspark functions which depends on Python packages and Scala libraries. To see all dependencies please look at Dependencies section. https://pypi.org/project/pyspark/.
Package needs 2 jar files to work properly:
- geospark.jar
- geospark-sql.jar
- geo_wrapper.jar
Note
Since GeoSpark 1.3.0 it is possible also to use maven jars for GeoSparkSQL instead of geospark/jars/../geospark-sql jars files.
This package automatically copies the newest GeoSpark jar files using function, please follow the example below.
from pyspark.sql import SparkSession
from geospark.register import upload_jars
from geospark.register import GeoSparkRegistrator
upload_jars()
spark = SparkSession.builder.\
getOrCreate()
GeoSparkRegistrator.registerAll(spark)
Function
upload_jars()
uses findspark Python package to upload jar files to executor and nodes. To avoid copying all the time, jar files can be put in directory SPARK_HOME/jars or any other path specified in Spark config files.
Installing from PyPi repositories¶
Please use command below
pip install geospark
Installing from wheel file¶
pipenv run python -m pip install dist/geospark-1.3.1-py3-none-any.whl
or
pip install dist/geospark-1.3.1-py3-none-any.whl
Installing from source¶
python3 setup.py install
Core Classes and methods.¶
GeoSparkRegistrator.registerAll(spark: pyspark.sql.SparkSession) -> bool
This is the core of whole package. Class method registers all GeoSparkSQL functions (available for used GeoSparkSQL version). To check available functions please look at GeoSparkSQL section. :param spark: pyspark.sql.SparkSession, spark session instance
upload_jars() -> NoReturn
Function uses findspark
Python module to upload newest GeoSpark jars to Spark executor and nodes.
GeometryType()
Class which handle serialization and deserialization between GeoSpark geometries and Shapely BaseGeometry types.
KryoSerializer.getName -> str
Class property which returns org.apache.spark.serializer.KryoSerializer string, which simplify using GeoSpark Serializers.
GeoSparkKryoRegistrator.getName -> str
Class property which returns org.datasyslab.geospark.serde.GeoSparkKryoRegistrator string, which simplify using GeoSpark Serializers.
Writing Application¶
Use KryoSerializer.getName and GeoSparkKryoRegistrator.getName class properties to reduce memory impact, reffering to GeoSpark docs . To do that use spark config as follows:
.config("spark.serializer", KryoSerializer.getName)
.config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName)
If jars was not uploaded manually please use function upload_jars()
To turn on GeoSparkSQL function inside pyspark code use GeoSparkRegistrator.registerAll method on existing pyspark.sql.SparkSession instance ex.
GeoSparkRegistrator.registerAll(spark)
After that all the functions from GeoSparkSQL will be available, moreover using collect or toPandas methods on Spark DataFrame will return Shapely BaseGeometry objects. Based on GeoPandas DataFrame, Pandas DataFrame with shapely objects or Sequence with shapely objects, Spark DataFrame can be created using spark.createDataFrame method. To specify Schema with geometry inside please use GeometryType()
instance (look at examples section to see that in practice).
Examples¶
GeoSparkSQL¶
All GeoSparkSQL functions (list depends on GeoSparkSQL version) are available in Python API. For documentation please look at GeoSpark website
For example use GeoSparkSQL for Spatial Join.
counties = spark.\
read.\
option("delimiter", "|").\
option("header", "true").\
csv("counties.csv")
counties.createOrReplaceTempView("county")
counties_geom = spark.sql(
"SELECT county_code, st_geomFromWKT(geom) as geometry from county"
)
counties_geom.show(5)
+-----------+--------------------+
|county_code| geometry|
+-----------+--------------------+
| 1815|POLYGON ((21.6942...|
| 1410|POLYGON ((22.7238...|
| 1418|POLYGON ((21.1100...|
| 1425|POLYGON ((20.9891...|
| 1427|POLYGON ((19.5087...|
+-----------+--------------------+
import geopandas as gpd
points = gpd.read_file("gis_osm_pois_free_1.shp")
points_geom = spark.createDataFrame(
points[["fclass", "geometry"]]
)
points_geom.show(5, False)
+---------+-----------------------------+
|fclass |geometry |
+---------+-----------------------------+
|camp_site|POINT (15.3393145 52.3504247)|
|chalet |POINT (14.8709625 52.691693) |
|motel |POINT (15.0946636 52.3130396)|
|atm |POINT (15.0732014 52.3141083)|
|hotel |POINT (15.0696777 52.3143013)|
+---------+-----------------------------+
points_geom.createOrReplaceTempView("pois")
counties_geom.createOrReplaceTempView("counties")
spatial_join_result = spark.sql(
"""
SELECT c.county_code, p.fclass
FROM pois AS p, counties AS c
WHERE ST_Intersects(p.geometry, c.geometry)
"""
)
spatial_join_result.explain()
== Physical Plan ==
*(2) Project [county_code#230, fclass#239]
+- RangeJoin geometry#240: geometry, geometry#236: geometry, true
:- Scan ExistingRDD[fclass#239,geometry#240]
+- Project [county_code#230, st_geomfromwkt(geom#232) AS geometry#236]
+- *(1) FileScan csv [county_code#230,geom#232] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/projects/geospark/counties.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<county_code:string,geom:string>
pois_per_county = spatial_join_result.groupBy("county_code", "fclass"). \
count()
pois_per_county.show(5, False)
+-----------+---------+-----+
|county_code|fclass |count|
+-----------+---------+-----+
|0805 |atm |6 |
|0805 |bench |75 |
|0803 |museum |9 |
|0802 |fast_food|5 |
|0862 |atm |20 |
+-----------+---------+-----+
Integration with GeoPandas and Shapely¶
geospark has implemented serializers and deserializers which allows to convert GeoSpark Geometry objects into Shapely BaseGeometry objects. Based on that it is possible to load the data with geopandas from file (look at Fiona possible drivers) and create Spark DataFrame based on GeoDataFrame object.
Example, loading the data from shapefile using geopandas read_file method and create Spark DataFrame based on GeoDataFrame:
import geopandas as gpd
from pyspark.sql import SparkSession
from geospark.register import GeoSparkRegistrator
spark = SparkSession.builder.\
getOrCreate()
GeoSparkRegistrator.registerAll(spark)
gdf = gpd.read_file("gis_osm_pois_free_1.shp")
spark.createDataFrame(
gdf
).show()
+---------+----+-----------+--------------------+--------------------+
| osm_id|code| fclass| name| geometry|
+---------+----+-----------+--------------------+--------------------+
| 26860257|2422| camp_site| de Kroon|POINT (15.3393145...|
| 26860294|2406| chalet| Leśne Ustronie|POINT (14.8709625...|
| 29947493|2402| motel| null|POINT (15.0946636...|
| 29947498|2602| atm| null|POINT (15.0732014...|
| 29947499|2401| hotel| null|POINT (15.0696777...|
| 29947505|2401| hotel| null|POINT (15.0155749...|
+---------+----+-----------+--------------------+--------------------+
Reading data with Spark and converting to GeoPandas
import geopandas as gpd
from pyspark.sql import SparkSession
from geospark.register import GeoSparkRegistrator
spark = SparkSession.builder.\
getOrCreate()
GeoSparkRegistrator.registerAll(spark)
counties = spark.\
read.\
option("delimiter", "|").\
option("header", "true").\
csv("counties.csv")
counties.createOrReplaceTempView("county")
counties_geom = spark.sql(
"SELECT *, st_geomFromWKT(geom) as geometry from county"
)
df = counties_geom.toPandas()
gdf = gpd.GeoDataFrame(df, geometry="geometry")
gdf.plot(
figsize=(10, 8),
column="value",
legend=True,
cmap='YlOrBr',
scheme='quantiles',
edgecolor='lightgray'
)
Creating Spark DataFrame based on shapely objects¶
Supported Shapely objects¶
shapely object | Available |
---|---|
Point | |
MultiPoint | |
LineString | |
MultiLinestring | |
Polygon | |
MultiPolygon |
To create Spark DataFrame based on mentioned Geometry types, please use GeometryType from geospark.sql.types module. Converting works for list or tuple with shapely objects.
Schema for target table with integer id and geometry type can be defined as follow:
from pyspark.sql.types import IntegerType, StructField, StructType
from geospark.sql.types import GeometryType
schema = StructType(
[
StructField("id", IntegerType(), False),
StructField("geom", GeometryType(), False)
]
)
Also Spark DataFrame with geometry type can be converted to list of shapely objects with collect method.
Example usage for Shapely objects¶
Point¶
from shapely.geometry import Point
data = [
[1, Point(21.0, 52.0)],
[1, Point(23.0, 42.0)],
[1, Point(26.0, 32.0)]
]
gdf = spark.createDataFrame(
data,
schema
)
gdf.show()
+---+-------------+
| id| geom|
+---+-------------+
| 1|POINT (21 52)|
| 1|POINT (23 42)|
| 1|POINT (26 32)|
+---+-------------+
gdf.printSchema()
root
|-- id: integer (nullable = false)
|-- geom: geometry (nullable = false)
MultiPoint¶
from shapely.geometry import MultiPoint
data = [
[1, MultiPoint([[19.511463, 51.765158], [19.446408, 51.779752]])]
]
gdf = spark.createDataFrame(
data,
schema
).show(1, False)
+---+---------------------------------------------------------+
|id |geom |
+---+---------------------------------------------------------+
|1 |MULTIPOINT ((19.511463 51.765158), (19.446408 51.779752))|
+---+---------------------------------------------------------+
LineString¶
from shapely.geometry import LineString
line = [(40, 40), (30, 30), (40, 20), (30, 10)]
data = [
[1, LineString(line)]
]
gdf = spark.createDataFrame(
data,
schema
)
gdf.show(1, False)
+---+--------------------------------+
|id |geom |
+---+--------------------------------+
|1 |LINESTRING (10 10, 20 20, 10 40)|
+---+--------------------------------+
MultiLineString¶
from shapely.geometry import MultiLineString
line1 = [(10, 10), (20, 20), (10, 40)]
line2 = [(40, 40), (30, 30), (40, 20), (30, 10)]
data = [
[1, MultiLineString([line1, line2])]
]
gdf = spark.createDataFrame(
data,
schema
)
gdf.show(1, False)
+---+---------------------------------------------------------------------+
|id |geom |
+---+---------------------------------------------------------------------+
|1 |MULTILINESTRING ((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))|
+---+---------------------------------------------------------------------+
Polygon¶
from shapely.geometry import Polygon
polygon = Polygon(
[
[19.51121, 51.76426],
[19.51056, 51.76583],
[19.51216, 51.76599],
[19.51280, 51.76448],
[19.51121, 51.76426]
]
)
data = [
[1, polygon]
]
gdf = spark.createDataFrame(
data,
schema
)
gdf.show(1, False)
+---+--------------------------------------------------------------------------------------------------------+
|id |geom |
+---+--------------------------------------------------------------------------------------------------------+
|1 |POLYGON ((19.51121 51.76426, 19.51056 51.76583, 19.51216 51.76599, 19.5128 51.76448, 19.51121 51.76426))|
+---+--------------------------------------------------------------------------------------------------------+
MultiPolygon¶
from shapely.geometry import MultiPolygon
exterior_p1 = [(0, 0), (0, 2), (2, 2), (2, 0), (0, 0)]
interior_p1 = [(1, 1), (1, 1.5), (1.5, 1.5), (1.5, 1), (1, 1)]
exterior_p2 = [(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]
polygons = [
Polygon(exterior_p1, [interior_p1]),
Polygon(exterior_p2)
]
data = [
[1, MultiPolygon(polygons)]
]
gdf = spark.createDataFrame(
data,
schema
)
gdf.show(1, False)
+---+----------------------------------------------------------------------------------------------------------+
|id |geom |
+---+----------------------------------------------------------------------------------------------------------+
|1 |MULTIPOLYGON (((0 0, 0 2, 2 2, 2 0, 0 0), (1 1, 1.5 1, 1.5 1.5, 1 1.5, 1 1)), ((0 0, 0 1, 1 1, 1 0, 0 0)))|
+---+----------------------------------------------------------------------------------------------------------+
Supported versions¶
Currently this python wrapper supports the following Spark, GeoSpark and Python versions: