Skip to content

Scala/Java

The page outlines the steps to manage spatial data using SedonaSQL. The example code is written in Java but also works for Scala.

SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. It includes four kinds of SQL operators as follows. All these operators can be directly called through:

Table myTable = tableEnv.sqlQuery("YOUR_SQL")

Detailed SedonaSQL APIs are available here: SedonaSQL API

Set up dependencies

  1. Read Sedona Maven Central coordinates
  2. Add Sedona dependencies in build.sbt or pom.xml.
  3. Add Flink dependencies in build.sbt or pom.xml.

Initiate Stream Environment

Use the following code to initiate your StreamExecutionEnvironment at the beginning:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

Register SedonaSQL

Add the following line after your StreamExecutionEnvironment and StreamTableEnvironment declaration

SedonaFlinkRegistrator.registerType(env);
SedonaFlinkRegistrator.registerFunc(tableEnv);

Warning

Sedona has a suite of well-written geometry and index serializers. Forgetting to enable these serializers will lead to high memory consumption.

This function will register Sedona User Defined Type and User Defined Function

Create a Geometry type column

All geometrical operations in SedonaSQL are on Geometry type objects. Therefore, before any kind of queries, you need to create a Geometry type column on a DataFrame.

Assume you have a Flink Table tbl like this:

+----+--------------------------------+--------------------------------+
| op |                   geom_polygon |                   name_polygon |
+----+--------------------------------+--------------------------------+
| +I | POLYGON ((-0.5 -0.5, -0.5 0... |                       polygon0 |
| +I | POLYGON ((0.5 0.5, 0.5 1.5,... |                       polygon1 |
| +I | POLYGON ((1.5 1.5, 1.5 2.5,... |                       polygon2 |
| +I | POLYGON ((2.5 2.5, 2.5 3.5,... |                       polygon3 |
| +I | POLYGON ((3.5 3.5, 3.5 4.5,... |                       polygon4 |
| +I | POLYGON ((4.5 4.5, 4.5 5.5,... |                       polygon5 |
| +I | POLYGON ((5.5 5.5, 5.5 6.5,... |                       polygon6 |
| +I | POLYGON ((6.5 6.5, 6.5 7.5,... |                       polygon7 |
| +I | POLYGON ((7.5 7.5, 7.5 8.5,... |                       polygon8 |
| +I | POLYGON ((8.5 8.5, 8.5 9.5,... |                       polygon9 |
+----+--------------------------------+--------------------------------+
10 rows in set

You can create a Table with a Geometry type column as follows:

tableEnv.createTemporaryView("myTable", tbl)
Table geomTbl = tableEnv.sql("SELECT ST_GeomFromWKT(geom_polygon) as geom_polygon, name_polygon FROM myTable")
geomTbl.execute().print()

The output will be:

+----+--------------------------------+--------------------------------+
| op |                   geom_polygon |                   name_polygon |
+----+--------------------------------+--------------------------------+
| +I | POLYGON ((-0.5 -0.5, -0.5 0... |                       polygon0 |
| +I | POLYGON ((0.5 0.5, 0.5 1.5,... |                       polygon1 |
| +I | POLYGON ((1.5 1.5, 1.5 2.5,... |                       polygon2 |
| +I | POLYGON ((2.5 2.5, 2.5 3.5,... |                       polygon3 |
| +I | POLYGON ((3.5 3.5, 3.5 4.5,... |                       polygon4 |
| +I | POLYGON ((4.5 4.5, 4.5 5.5,... |                       polygon5 |
| +I | POLYGON ((5.5 5.5, 5.5 6.5,... |                       polygon6 |
| +I | POLYGON ((6.5 6.5, 6.5 7.5,... |                       polygon7 |
| +I | POLYGON ((7.5 7.5, 7.5 8.5,... |                       polygon8 |
| +I | POLYGON ((8.5 8.5, 8.5 9.5,... |                       polygon9 |
+----+--------------------------------+--------------------------------+
10 rows in set

Although it looks same with the input, actually the type of column geom_polygon has been changed to Geometry type.

To verify this, use the following code to print the schema of the DataFrame:

geomTbl.printSchema()

The output will be like this:

(
  `geom_polygon` RAW('org.locationtech.jts.geom.Geometry', '...'),
  `name_polygon` STRING
)

Note

SedonaSQL provides lots of functions to create a Geometry column, please read SedonaSQL constructor API.

Transform the Coordinate Reference System

Sedona doesn't control the coordinate unit (degree-based or meter-based) of all geometries in a Geometry column. The unit of all related distances in SedonaSQL is same as the unit of all geometries in a Geometry column.

To convert Coordinate Reference System of the Geometry column created before, use the following code:

Table geomTbl3857 = tableEnv.sqlQuery("SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable")
geomTbl3857.execute().print()

The first EPSG code EPSG:4326 in ST_Transform is the source CRS of the geometries. It is WGS84, the most common degree-based CRS.

The second EPSG code EPSG:3857 in ST_Transform is the target CRS of the geometries. It is the most common meter-based CRS.

This ST_Transform transform the CRS of these geomtries from EPSG:4326 to EPSG:3857. The details CRS information can be found on EPSG.io

Note

Read SedonaSQL ST_Transform API to learn different spatial query predicates.

For example, a Table that has coordinates in the US will become like this.

Before the transformation:

+----+--------------------------------+--------------------------------+
| op |                     geom_point |                     name_point |
+----+--------------------------------+--------------------------------+
| +I |                POINT (32 -118) |                          point |
| +I |                POINT (33 -117) |                          point |
| +I |                POINT (34 -116) |                          point |
| +I |                POINT (35 -115) |                          point |
| +I |                POINT (36 -114) |                          point |
| +I |                POINT (37 -113) |                          point |
| +I |                POINT (38 -112) |                          point |
| +I |                POINT (39 -111) |                          point |
| +I |                POINT (40 -110) |                          point |
| +I |                POINT (41 -109) |                          point |
+----+--------------------------------+--------------------------------+

After the transformation:

+----+--------------------------------+--------------------------------+
| op |                            _c0 |                     name_point |
+----+--------------------------------+--------------------------------+
| +I | POINT (-13135699.91360628 3... |                          point |
| +I | POINT (-13024380.422813008 ... |                          point |
| +I | POINT (-12913060.932019735 ... |                          point |
| +I | POINT (-12801741.44122646 4... |                          point |
| +I | POINT (-12690421.950433187 ... |                          point |
| +I | POINT (-12579102.459639912 ... |                          point |
| +I | POINT (-12467782.96884664 4... |                          point |
| +I | POINT (-12356463.478053367 ... |                          point |
| +I | POINT (-12245143.987260092 ... |                          point |
| +I | POINT (-12133824.496466817 ... |                          point |
+----+--------------------------------+--------------------------------+

Run spatial queries

After creating a Geometry type column, you are able to run spatial queries.

Range query

Use ST_Contains, ST_Intersects and so on to run a range query over a single column.

The following example finds all counties that are within the given polygon:

geomTable = tableEnv.sqlQuery(
  "
    SELECT *
    FROM spatialdf
    WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape)
  ")
geomTable.execute().print()

Note

Read SedonaSQL Predicate API to learn different spatial query predicates.

KNN query

Use ST_Distance to calculate the distance and rank the distance.

The following code returns the 5 nearest neighbor of the given polygon.

geomTable = tableEnv.sqlQuery(
  "
    SELECT countyname, ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS distance
    FROM geomTable
    ORDER BY distance DESC
    LIMIT 5
  ")
geomTable.execute().print()

Convert Spatial Table to Spatial DataStream

Get DataStream

Use TableEnv's toDataStream function

DataStream<Row> geomStream = tableEnv.toDataStream(geomTable)

Retrieve Geometries

Then get the Geometry from each Row object using Map

import org.locationtech.jts.geom.Geometry;

DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row, Geometry>() {
            @Override
            public Geometry map(Row value) throws Exception {
                return (Geometry) value.getField(0);
            }
        });
geometries.print();

The output will be

14> POLYGON ((1.5 1.5, 1.5 2.5, 2.5 2.5, 2.5 1.5, 1.5 1.5))
2> POLYGON ((5.5 5.5, 5.5 6.5, 6.5 6.5, 6.5 5.5, 5.5 5.5))
5> POLYGON ((8.5 8.5, 8.5 9.5, 9.5 9.5, 9.5 8.5, 8.5 8.5))
16> POLYGON ((3.5 3.5, 3.5 4.5, 4.5 4.5, 4.5 3.5, 3.5 3.5))
12> POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))
13> POLYGON ((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))
15> POLYGON ((2.5 2.5, 2.5 3.5, 3.5 3.5, 3.5 2.5, 2.5 2.5))
3> POLYGON ((6.5 6.5, 6.5 7.5, 7.5 7.5, 7.5 6.5, 6.5 6.5))
1> POLYGON ((4.5 4.5, 4.5 5.5, 5.5 5.5, 5.5 4.5, 4.5 4.5))
4> POLYGON ((7.5 7.5, 7.5 8.5, 8.5 8.5, 8.5 7.5, 7.5 7.5))

Store non-spatial attributes in Geometries

You can concatenate other non-spatial attributes and store them in Geometry's userData field so you can recover them later on. userData field can be any object type.

import org.locationtech.jts.geom.Geometry;

DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row, Geometry>() {
            @Override
            public Geometry map(Row value) throws Exception {
                Geometry geom = (Geometry) value.getField(0);
                geom.setUserData(value.getField(1));
                return geom;
            }
        });
geometries.print();

The print command will not print out userData field. But you can get it this way:

import org.locationtech.jts.geom.Geometry;

geometries.map(new MapFunction<Geometry, String>() {
            @Override
            public String map(Geometry value) throws Exception
            {
                return (String) value.getUserData();
            }
        }).print();

The output will be

13> polygon9
6> polygon2
10> polygon6
11> polygon7
5> polygon1
12> polygon8
8> polygon4
4> polygon0
7> polygon3
9> polygon5

Convert Spatial DataStream to Spatial Table

Create Geometries using Sedona FormatUtils

  • Create a Geometry from a WKT string
import org.apache.sedona.core.formatMapper.FormatUtils;
import org.locationtech.jts.geom.Geometry;

DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
            @Override
            public Geometry map(String value) throws Exception
            {
                FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
                return formatUtils.readGeometry(value);
            }
        })
  • Create a Point from a String 1.1, 2.2. Use , as the delimiter.
import org.apache.sedona.core.formatMapper.FormatUtils;
import org.locationtech.jts.geom.Geometry;

DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
            @Override
            public Geometry map(String value) throws Exception
            {
                FormatUtils<Geometry> formatUtils = new FormatUtils(",", false, GeometryType.POINT);
                return formatUtils.readGeometry(value);
            }
        })
  • Create a Polygon from a String 1.1, 1.1, 10.1, 10.1. This is a rectangle with (1.1, 1.1) and (10.1, 10.1) as their min/max corners.
import org.apache.sedona.core.formatMapper.FormatUtils;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Geometry;

DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
            @Override
            public Geometry map(String value) throws Exception
            {
                  // Write some code to get four double type values: minX, minY, maxX, maxY
                  ...
                Coordinate[] coordinates = new Coordinate[5];
                coordinates[0] = new Coordinate(minX, minY);
                coordinates[1] = new Coordinate(minX, maxY);
                coordinates[2] = new Coordinate(maxX, maxY);
                coordinates[3] = new Coordinate(maxX, minY);
                coordinates[4] = coordinates[0];
                GeometryFactory geometryFactory = new GeometryFactory();
                return geometryFactory.createPolygon(coordinates);
            }
        })

Create Row objects

Put a geometry in a Flink Row to a geomStream. Note that you can put other attributes in Row as well. This example uses a constant value myName for all geometries.

import org.apache.sedona.core.formatMapper.FormatUtils;
import org.locationtech.jts.geom.Geometry;
import org.apache.flink.types.Row;

DataStream<Row> geomStream = text.map(new MapFunction<String, Row>() {
            @Override
            public Row map(String value) throws Exception
            {
                FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
                return Row.of(formatUtils.readGeometry(value), "myName");
            }
        })

Get Spatial Table

Use TableEnv's fromDataStream function, with two column names geom and geom_name.

Table geomTable = tableEnv.fromDataStream(geomStream, "geom", "geom_name")


Last update: March 6, 2022 09:51:33