Skip to content

Spatial SQL Application in Python

Introduction

GeoSPark provides a Python wrapper for its Spatial SQL / DataFrame interface. The official repository for GeoSpark can be found at https://github.com/DataSystemsLab/GeoSpark.

This package allows users to use all GeoSparkSQL functions and transform it to Python Shapely geometry objects. Also it allows to create Spark DataFrame with GeoSpark UDT from Shapely geometry objects. Spark DataFrame can be converted to GeoPandas easily, in addition all fiona drivers for shape file are available to load data from files and convert them to Spark DataFrame. Please look at examples.

Installation

GeoSpark extends pyspark functions which depends on Python packages and Scala libraries. To see all dependencies please look at Dependencies section. https://pypi.org/project/pyspark/.

Package needs 2 jar files to work properly:

  • geospark.jar
  • geospark-sql.jar
  • geo_wrapper.jar

Note

Since GeoSpark 1.3.0 it is possible also to use maven jars for GeoSparkSQL instead of geospark/jars/../geospark-sql jars files.

This package automatically copies the newest GeoSpark jar files using function, please follow the example below.

  • upload_jars
  • from pyspark.sql import SparkSession
    
    from geospark.register import upload_jars
    from geospark.register import GeoSparkRegistrator
    
    upload_jars()
    
    spark = SparkSession.builder.\
          getOrCreate()
    
    GeoSparkRegistrator.registerAll(spark)
    

    Function

    upload_jars()
    

    uses findspark Python package to upload jar files to executor and nodes. To avoid copying all the time, jar files can be put in directory SPARK_HOME/jars or any other path specified in Spark config files.

    Installing from PyPi repositories

    Please use command below

    pip install geospark
    

    Installing from wheel file

    pipenv run python -m pip install dist/geospark-1.3.1-py3-none-any.whl
    

    or

    pip install dist/geospark-1.3.1-py3-none-any.whl
    

    Installing from source

    python3 setup.py install
    

    Core Classes and methods.

    GeoSparkRegistrator.registerAll(spark: pyspark.sql.SparkSession) -> bool

    This is the core of whole package. Class method registers all GeoSparkSQL functions (available for used GeoSparkSQL version). To check available functions please look at GeoSparkSQL section. :param spark: pyspark.sql.SparkSession, spark session instance

    upload_jars() -> NoReturn

    Function uses findspark Python module to upload newest GeoSpark jars to Spark executor and nodes.

    GeometryType()

    Class which handle serialization and deserialization between GeoSpark geometries and Shapely BaseGeometry types.

    KryoSerializer.getName -> str Class property which returns org.apache.spark.serializer.KryoSerializer string, which simplify using GeoSpark Serializers.

    GeoSparkKryoRegistrator.getName -> str Class property which returns org.datasyslab.geospark.serde.GeoSparkKryoRegistrator string, which simplify using GeoSpark Serializers.

    Writing Application

    Use KryoSerializer.getName and GeoSparkKryoRegistrator.getName class properties to reduce memory impact, reffering to GeoSpark docs . To do that use spark config as follows:

    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName)
    

    If jars was not uploaded manually please use function upload_jars()

    To turn on GeoSparkSQL function inside pyspark code use GeoSparkRegistrator.registerAll method on existing pyspark.sql.SparkSession instance ex.

    GeoSparkRegistrator.registerAll(spark)

    After that all the functions from GeoSparkSQL will be available, moreover using collect or toPandas methods on Spark DataFrame will return Shapely BaseGeometry objects. Based on GeoPandas DataFrame, Pandas DataFrame with shapely objects or Sequence with shapely objects, Spark DataFrame can be created using spark.createDataFrame method. To specify Schema with geometry inside please use GeometryType() instance (look at examples section to see that in practice).

    Examples

    GeoSparkSQL

    All GeoSparkSQL functions (list depends on GeoSparkSQL version) are available in Python API. For documentation please look at GeoSpark website

    For example use GeoSparkSQL for Spatial Join.

    counties = spark.\
        read.\
        option("delimiter", "|").\
        option("header", "true").\
        csv("counties.csv")
    
    counties.createOrReplaceTempView("county")
    
    counties_geom = spark.sql(
          "SELECT county_code, st_geomFromWKT(geom) as geometry from county"
    )
    
    counties_geom.show(5)
    
    +-----------+--------------------+
    |county_code|            geometry|
    +-----------+--------------------+
    |       1815|POLYGON ((21.6942...|
    |       1410|POLYGON ((22.7238...|
    |       1418|POLYGON ((21.1100...|
    |       1425|POLYGON ((20.9891...|
    |       1427|POLYGON ((19.5087...|
    +-----------+--------------------+
    
    import geopandas as gpd
    
    points = gpd.read_file("gis_osm_pois_free_1.shp")
    
    points_geom = spark.createDataFrame(
        points[["fclass", "geometry"]]
    )
    
    points_geom.show(5, False)
    
    +---------+-----------------------------+
    |fclass   |geometry                     |
    +---------+-----------------------------+
    |camp_site|POINT (15.3393145 52.3504247)|
    |chalet   |POINT (14.8709625 52.691693) |
    |motel    |POINT (15.0946636 52.3130396)|
    |atm      |POINT (15.0732014 52.3141083)|
    |hotel    |POINT (15.0696777 52.3143013)|
    +---------+-----------------------------+
    

    points_geom.createOrReplaceTempView("pois")
    counties_geom.createOrReplaceTempView("counties")
    
    spatial_join_result = spark.sql(
        """
            SELECT c.county_code, p.fclass
            FROM pois AS p, counties AS c
            WHERE ST_Intersects(p.geometry, c.geometry)
        """
    )
    
    spatial_join_result.explain()
    
    == Physical Plan ==
    *(2) Project [county_code#230, fclass#239]
    +- RangeJoin geometry#240: geometry, geometry#236: geometry, true
       :- Scan ExistingRDD[fclass#239,geometry#240]
       +- Project [county_code#230, st_geomfromwkt(geom#232) AS geometry#236]
          +- *(1) FileScan csv [county_code#230,geom#232] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/projects/geospark/counties.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<county_code:string,geom:string>
    
    Calculating Number of Pois within counties per fclass.

    pois_per_county = spatial_join_result.groupBy("county_code", "fclass"). \
        count()
    
    pois_per_county.show(5, False)
    
    +-----------+---------+-----+
    |county_code|fclass   |count|
    +-----------+---------+-----+
    |0805       |atm      |6    |
    |0805       |bench    |75   |
    |0803       |museum   |9    |
    |0802       |fast_food|5    |
    |0862       |atm      |20   |
    +-----------+---------+-----+
    

    Integration with GeoPandas and Shapely

    geospark has implemented serializers and deserializers which allows to convert GeoSpark Geometry objects into Shapely BaseGeometry objects. Based on that it is possible to load the data with geopandas from file (look at Fiona possible drivers) and create Spark DataFrame based on GeoDataFrame object.

    Example, loading the data from shapefile using geopandas read_file method and create Spark DataFrame based on GeoDataFrame:

    import geopandas as gpd
    from pyspark.sql import SparkSession
    
    from geospark.register import GeoSparkRegistrator
    
    spark = SparkSession.builder.\
          getOrCreate()
    
    GeoSparkRegistrator.registerAll(spark)
    
    gdf = gpd.read_file("gis_osm_pois_free_1.shp")
    
    spark.createDataFrame(
      gdf
    ).show()
    
    +---------+----+-----------+--------------------+--------------------+
    |   osm_id|code|     fclass|                name|            geometry|
    +---------+----+-----------+--------------------+--------------------+
    | 26860257|2422|  camp_site|            de Kroon|POINT (15.3393145...|
    | 26860294|2406|     chalet|      Leśne Ustronie|POINT (14.8709625...|
    | 29947493|2402|      motel|                null|POINT (15.0946636...|
    | 29947498|2602|        atm|                null|POINT (15.0732014...|
    | 29947499|2401|      hotel|                null|POINT (15.0696777...|
    | 29947505|2401|      hotel|                null|POINT (15.0155749...|
    +---------+----+-----------+--------------------+--------------------+
    

    Reading data with Spark and converting to GeoPandas

    import geopandas as gpd
    from pyspark.sql import SparkSession
    
    from geospark.register import GeoSparkRegistrator
    
    spark = SparkSession.builder.\
        getOrCreate()
    
    GeoSparkRegistrator.registerAll(spark)
    
    counties = spark.\
        read.\
        option("delimiter", "|").\
        option("header", "true").\
        csv("counties.csv")
    
    counties.createOrReplaceTempView("county")
    
    counties_geom = spark.sql(
        "SELECT *, st_geomFromWKT(geom) as geometry from county"
    )
    
    df = counties_geom.toPandas()
    gdf = gpd.GeoDataFrame(df, geometry="geometry")
    
    gdf.plot(
        figsize=(10, 8),
        column="value",
        legend=True,
        cmap='YlOrBr',
        scheme='quantiles',
        edgecolor='lightgray'
    )
    


    poland_image



    Creating Spark DataFrame based on shapely objects

    Supported Shapely objects

    shapely object Available
    Point ✔️
    MultiPoint ✔️
    LineString ✔️
    MultiLinestring ✔️
    Polygon ✔️
    MultiPolygon ✔️

    To create Spark DataFrame based on mentioned Geometry types, please use GeometryType from geospark.sql.types module. Converting works for list or tuple with shapely objects.

    Schema for target table with integer id and geometry type can be defined as follow:

    from pyspark.sql.types import IntegerType, StructField, StructType
    
    from geospark.sql.types import GeometryType
    
    schema = StructType(
        [
            StructField("id", IntegerType(), False),
            StructField("geom", GeometryType(), False)
        ]
    )
    

    Also Spark DataFrame with geometry type can be converted to list of shapely objects with collect method.

    Example usage for Shapely objects

    Point

    from shapely.geometry import Point
    
    data = [
        [1, Point(21.0, 52.0)],
        [1, Point(23.0, 42.0)],
        [1, Point(26.0, 32.0)]
    ]
    
    
    gdf = spark.createDataFrame(
        data,
        schema
    )
    
    gdf.show()
    
    +---+-------------+
    | id|         geom|
    +---+-------------+
    |  1|POINT (21 52)|
    |  1|POINT (23 42)|
    |  1|POINT (26 32)|
    +---+-------------+
    
    gdf.printSchema()
    
    root
     |-- id: integer (nullable = false)
     |-- geom: geometry (nullable = false)
    

    MultiPoint

    from shapely.geometry import MultiPoint
    
    data = [
        [1, MultiPoint([[19.511463, 51.765158], [19.446408, 51.779752]])]
    ]
    
    gdf = spark.createDataFrame(
        data,
        schema
    ).show(1, False)
    
    +---+---------------------------------------------------------+
    |id |geom                                                     |
    +---+---------------------------------------------------------+
    |1  |MULTIPOINT ((19.511463 51.765158), (19.446408 51.779752))|
    +---+---------------------------------------------------------+
    

    LineString

    from shapely.geometry import LineString
    
    line = [(40, 40), (30, 30), (40, 20), (30, 10)]
    
    data = [
        [1, LineString(line)]
    ]
    
    gdf = spark.createDataFrame(
        data,
        schema
    )
    
    gdf.show(1, False)
    
    +---+--------------------------------+
    |id |geom                            |
    +---+--------------------------------+
    |1  |LINESTRING (10 10, 20 20, 10 40)|
    +---+--------------------------------+
    

    MultiLineString

    from shapely.geometry import MultiLineString
    
    line1 = [(10, 10), (20, 20), (10, 40)]
    line2 = [(40, 40), (30, 30), (40, 20), (30, 10)]
    
    data = [
        [1, MultiLineString([line1, line2])]
    ]
    
    gdf = spark.createDataFrame(
        data,
        schema
    )
    
    gdf.show(1, False)
    
    +---+---------------------------------------------------------------------+
    |id |geom                                                                 |
    +---+---------------------------------------------------------------------+
    |1  |MULTILINESTRING ((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))|
    +---+---------------------------------------------------------------------+
    

    Polygon

    from shapely.geometry import Polygon
    
    polygon = Polygon(
        [
             [19.51121, 51.76426],
             [19.51056, 51.76583],
             [19.51216, 51.76599],
             [19.51280, 51.76448],
             [19.51121, 51.76426]
        ]
    )
    
    data = [
        [1, polygon]
    ]
    
    gdf = spark.createDataFrame(
        data,
        schema
    )
    
    gdf.show(1, False)
    
    +---+--------------------------------------------------------------------------------------------------------+
    |id |geom                                                                                                    |
    +---+--------------------------------------------------------------------------------------------------------+
    |1  |POLYGON ((19.51121 51.76426, 19.51056 51.76583, 19.51216 51.76599, 19.5128 51.76448, 19.51121 51.76426))|
    +---+--------------------------------------------------------------------------------------------------------+
    

    MultiPolygon

    from shapely.geometry import MultiPolygon
    
    exterior_p1 = [(0, 0), (0, 2), (2, 2), (2, 0), (0, 0)]
    interior_p1 = [(1, 1), (1, 1.5), (1.5, 1.5), (1.5, 1), (1, 1)]
    
    exterior_p2 = [(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]
    
    polygons = [
        Polygon(exterior_p1, [interior_p1]),
        Polygon(exterior_p2)
    ]
    
    data = [
        [1, MultiPolygon(polygons)]
    ]
    
    gdf = spark.createDataFrame(
        data,
        schema
    )
    
    gdf.show(1, False)
    
    +---+----------------------------------------------------------------------------------------------------------+
    |id |geom                                                                                                      |
    +---+----------------------------------------------------------------------------------------------------------+
    |1  |MULTIPOLYGON (((0 0, 0 2, 2 2, 2 0, 0 0), (1 1, 1.5 1, 1.5 1.5, 1 1.5, 1 1)), ((0 0, 0 1, 1 1, 1 0, 0 0)))|
    +---+----------------------------------------------------------------------------------------------------------+
    

    Supported versions

    Currently this python wrapper supports the following Spark, GeoSpark and Python versions:

    Apache Spark

  • 2.2
  • 2.3
  • 2.4
  • GeoSparkSQL

  • 1.3.1
  • 1.2.0
  • 1.1.3
  • Python

  • 3.6
  • 3.7

  • Last update: September 15, 2020 23:40:05