Spatial SQL app (Flink)
The page outlines the steps to manage spatial data using SedonaSQL. The example code is written in Java but also works for Scala.
SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. It includes four kinds of SQL operators as follows. All these operators can be directly called through:
Table myTable = tableEnv.sqlQuery("YOUR_SQL")
Detailed SedonaSQL APIs are available here: SedonaSQL API
Set up dependencies¶
- Read Sedona Maven Central coordinates
- Add Sedona dependencies in build.sbt or pom.xml.
- Add Flink dependencies in build.sbt or pom.xml.
- Please see SQL example project
Initiate Stream Environment¶
Use the following code to initiate your StreamExecutionEnvironment
at the beginning:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Initiate SedonaContext¶
Add the following line after your StreamExecutionEnvironment
and StreamTableEnvironment
declaration
Sedona >= 1.4.1
StreamTableEnvironment sedona = SedonaContext.create(env, tableEnv);
Sedona <1.4.1
The following method has been deprecated since Sedona 1.4.1. Please use the method above to create your SedonaContext.
SedonaFlinkRegistrator.registerType(env);
SedonaFlinkRegistrator.registerFunc(tableEnv);
Warning
Sedona has a suite of well-written geometry and index serializers. Forgetting to enable these serializers will lead to high memory consumption.
This function will register Sedona User Defined Type and User Defined Function
Create a Geometry type column¶
All geometrical operations in SedonaSQL are on Geometry type objects. Therefore, before any kind of queries, you need to create a Geometry type column on a DataFrame.
Assume you have a Flink Table tbl
like this:
+----+--------------------------------+--------------------------------+
| op | geom_polygon | name_polygon |
+----+--------------------------------+--------------------------------+
| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 |
| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 |
| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 |
| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 |
| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 |
| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 |
| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 |
| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 |
| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 |
| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 |
+----+--------------------------------+--------------------------------+
10 rows in set
You can create a Table with a Geometry type column as follows:
sedona.createTemporaryView("myTable", tbl)
Table geomTbl = sedona.sqlQuery("SELECT ST_GeomFromWKT(geom_polygon) as geom_polygon, name_polygon FROM myTable")
geomTbl.execute().print()
The output will be:
+----+--------------------------------+--------------------------------+
| op | geom_polygon | name_polygon |
+----+--------------------------------+--------------------------------+
| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 |
| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 |
| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 |
| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 |
| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 |
| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 |
| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 |
| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 |
| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 |
| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 |
+----+--------------------------------+--------------------------------+
10 rows in set
Although it looks same with the input, actually the type of column geom_polygon has been changed to Geometry type.
To verify this, use the following code to print the schema of the DataFrame:
geomTbl.printSchema()
The output will be like this:
(
`geom_polygon` RAW('org.locationtech.jts.geom.Geometry', '...'),
`name_polygon` STRING
)
Note
SedonaSQL provides lots of functions to create a Geometry column, please read SedonaSQL constructor API.
Transform the Coordinate Reference System¶
Sedona doesn't control the coordinate unit (degree-based or meter-based) of all geometries in a Geometry column. The unit of all related distances in SedonaSQL is same as the unit of all geometries in a Geometry column.
To convert Coordinate Reference System of the Geometry column created before, use the following code:
Table geomTbl3857 = sedona.sqlQuery("SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable")
geomTbl3857.execute().print()
The first EPSG code EPSG:4326 in ST_Transform
is the source CRS of the geometries. It is WGS84, the most common degree-based CRS.
The second EPSG code EPSG:3857 in ST_Transform
is the target CRS of the geometries. It is the most common meter-based CRS.
This ST_Transform
transform the CRS of these geometries from EPSG:4326 to EPSG:3857. The details CRS information can be found on EPSG.io
Note
Read SedonaSQL ST_Transform API to learn different spatial query predicates.
For example, a Table that has coordinates in the US will become like this.
Before the transformation:
+----+--------------------------------+--------------------------------+
| op | geom_point | name_point |
+----+--------------------------------+--------------------------------+
| +I | POINT (32 -118) | point |
| +I | POINT (33 -117) | point |
| +I | POINT (34 -116) | point |
| +I | POINT (35 -115) | point |
| +I | POINT (36 -114) | point |
| +I | POINT (37 -113) | point |
| +I | POINT (38 -112) | point |
| +I | POINT (39 -111) | point |
| +I | POINT (40 -110) | point |
| +I | POINT (41 -109) | point |
+----+--------------------------------+--------------------------------+
After the transformation:
+----+--------------------------------+--------------------------------+
| op | _c0 | name_point |
+----+--------------------------------+--------------------------------+
| +I | POINT (-13135699.91360628 3... | point |
| +I | POINT (-13024380.422813008 ... | point |
| +I | POINT (-12913060.932019735 ... | point |
| +I | POINT (-12801741.44122646 4... | point |
| +I | POINT (-12690421.950433187 ... | point |
| +I | POINT (-12579102.459639912 ... | point |
| +I | POINT (-12467782.96884664 4... | point |
| +I | POINT (-12356463.478053367 ... | point |
| +I | POINT (-12245143.987260092 ... | point |
| +I | POINT (-12133824.496466817 ... | point |
+----+--------------------------------+--------------------------------+
After creating a Geometry type column, you are able to run spatial queries.
Range query¶
Use ST_Contains, ST_Intersects and so on to run a range query over a single column.
The following example finds all counties that are within the given polygon:
geomTable = sedona.sqlQuery(
"
SELECT *
FROM spatialdf
WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape)
")
geomTable.execute().print()
Note
Read SedonaSQL Predicate API to learn different spatial query predicates.
KNN query¶
Use ST_Distance to calculate the distance and rank the distance.
The following code returns the 5 nearest neighbor of the given polygon.
geomTable = sedona.sqlQuery(
"
SELECT countyname, ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS distance
FROM geomTable
ORDER BY distance DESC
LIMIT 5
")
geomTable.execute().print()
Join query¶
This equi-join leverages Flink's internal equi-join algorithm. You can opt to skip the Sedona refinement step by sacrificing query accuracy. A running example is in SQL example project.
Please use the following steps:
1. Generate S2 ids for both tables¶
Use ST_S2CellIds to generate cell IDs. Each geometry may produce one or more IDs.
SELECT id, geom, name, ST_S2CellIDs(geom, 15) as idarray
FROM lefts
SELECT id, geom, name, ST_S2CellIDs(geom, 15) as idarray
FROM rights
2. Explode id array¶
The produced S2 ids are arrays of integers. We need to explode these Ids to multiple rows, so later we can join two tables by ids.
SELECT id, geom, name, cellId
FROM lefts CROSS JOIN UNNEST(lefts.idarray) AS tmpTbl1(cellId)
SELECT id, geom, name, cellId
FROM rights CROSS JOIN UNNEST(rights.idarray) AS tmpTbl2(cellId)
3. Perform equi-join¶
Join the two tables by their S2 cellId
SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name
FROM lcs JOIN rcs ON lcs.cellId = rcs.cellId
4. Optional: Refine the result¶
Due to the nature of S2 Cellid, the equi-join results might have a few false-positives depending on the S2 level you choose. A smaller level indicates bigger cells, less exploded rows, but more false positives.
To ensure the correctness, you can use one of the Spatial Predicates to filter out them. Use this query as the query in Step 3.
SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name
FROM lcs, rcs
WHERE lcs.cellId = rcs.cellId AND ST_Contains(lcs.geom, rcs.geom)
As you see, compared to the query in Step 2, we added one more filter, which is ST_Contains
, to remove false positives. You can also use ST_Intersects
and so on.
Tip
You can skip this step if you don't need 100% accuracy and want faster query speed.
5. Optional: De-duplicate¶
Due to the explode function used when we generate S2 Cell Ids, the resulting DataFrame may have several duplicate
SELECT lcs_id, rcs_id, FIRST_VALUE(lcs_geom), FIRST_VALUE(lcs_name), first(rcs_geom), first(rcs_name)
FROM joinresult
GROUP BY (lcs_id, rcs_id)
The FIRST_VALUE
function is to take the first value from a number of duplicate values.
If you don't have a unique id for each geometry, you can also group by geometry itself. See below:
SELECT lcs_geom, rcs_geom, first(lcs_name), first(rcs_name)
FROM joinresult
GROUP BY (lcs_geom, rcs_geom)
Note
If you are doing point-in-polygon join, this is not a problem, and you can safely discard this issue. This issue only happens when you do polygon-polygon, polygon-linestring, linestring-linestring join.
S2 for distance join¶
This also works for distance join. You first need to use ST_Buffer(geometry, distance)
to wrap one of your original geometry column. If your original geometry column contains points, this ST_Buffer
will make them become circles with a radius of distance
.
For example. run this query first on the left table before Step 1.
SELECT id, ST_Buffer(geom, DISTANCE), name
FROM lefts
Since the coordinates are in the longitude and latitude system, so the unit of distance
should be degree instead of meter or mile. You will have to estimate the corresponding degrees based on your meter values. Please use this calculator.
Convert Spatial Table to Spatial DataStream¶
Get DataStream¶
Use TableEnv's toDataStream function
DataStream<Row> geomStream = sedona.toDataStream(geomTable)
Retrieve Geometries¶
Then get the Geometry from each Row object using Map
import org.locationtech.jts.geom.Geometry;
DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row, Geometry>() {
@Override
public Geometry map(Row value) throws Exception {
return (Geometry) value.getField(0);
}
});
geometries.print();
The output will be
14> POLYGON ((1.5 1.5, 1.5 2.5, 2.5 2.5, 2.5 1.5, 1.5 1.5))
2> POLYGON ((5.5 5.5, 5.5 6.5, 6.5 6.5, 6.5 5.5, 5.5 5.5))
5> POLYGON ((8.5 8.5, 8.5 9.5, 9.5 9.5, 9.5 8.5, 8.5 8.5))
16> POLYGON ((3.5 3.5, 3.5 4.5, 4.5 4.5, 4.5 3.5, 3.5 3.5))
12> POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))
13> POLYGON ((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))
15> POLYGON ((2.5 2.5, 2.5 3.5, 3.5 3.5, 3.5 2.5, 2.5 2.5))
3> POLYGON ((6.5 6.5, 6.5 7.5, 7.5 7.5, 7.5 6.5, 6.5 6.5))
1> POLYGON ((4.5 4.5, 4.5 5.5, 5.5 5.5, 5.5 4.5, 4.5 4.5))
4> POLYGON ((7.5 7.5, 7.5 8.5, 8.5 8.5, 8.5 7.5, 7.5 7.5))
Store non-spatial attributes in Geometries¶
You can concatenate other non-spatial attributes and store them in Geometry's userData
field, so you can recover them later on. userData
field can be any object type.
import org.locationtech.jts.geom.Geometry;
DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row, Geometry>() {
@Override
public Geometry map(Row value) throws Exception {
Geometry geom = (Geometry) value.getField(0);
geom.setUserData(value.getField(1));
return geom;
}
});
geometries.print();
The print
command will not print out userData
field. But you can get it this way:
import org.locationtech.jts.geom.Geometry;
geometries.map(new MapFunction<Geometry, String>() {
@Override
public String map(Geometry value) throws Exception
{
return (String) value.getUserData();
}
}).print();
The output will be
13> polygon9
6> polygon2
10> polygon6
11> polygon7
5> polygon1
12> polygon8
8> polygon4
4> polygon0
7> polygon3
9> polygon5
Convert Spatial DataStream to Spatial Table¶
Create Geometries using Sedona FormatUtils¶
- Create a Geometry from a WKT string
import org.apache.sedona.common.utils.FormatUtils;
import org.locationtech.jts.geom.Geometry;
DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
@Override
public Geometry map(String value) throws Exception
{
FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
return formatUtils.readGeometry(value);
}
})
- Create a Point from a String
1.1, 2.2
. Use,
as the delimiter.
import org.apache.sedona.common.utils.FormatUtils;
import org.locationtech.jts.geom.Geometry;
DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
@Override
public Geometry map(String value) throws Exception
{
FormatUtils<Geometry> formatUtils = new FormatUtils(",", false, GeometryType.POINT);
return formatUtils.readGeometry(value);
}
})
- Create a Polygon from a String
1.1, 1.1, 10.1, 10.1
. This is a rectangle with (1.1, 1.1) and (10.1, 10.1) as their min/max corners.
import org.apache.sedona.common.utils.FormatUtils;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Geometry;
DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
@Override
public Geometry map(String value) throws Exception
{
// Write some code to get four double type values: minX, minY, maxX, maxY
...
Coordinate[] coordinates = new Coordinate[5];
coordinates[0] = new Coordinate(minX, minY);
coordinates[1] = new Coordinate(minX, maxY);
coordinates[2] = new Coordinate(maxX, maxY);
coordinates[3] = new Coordinate(maxX, minY);
coordinates[4] = coordinates[0];
GeometryFactory geometryFactory = new GeometryFactory();
return geometryFactory.createPolygon(coordinates);
}
})
Create Row objects¶
Put a geometry in a Flink Row to a geomStream
. Note that you can put other attributes in Row as well. This example uses a constant value myName
for all geometries.
import org.apache.sedona.common.utils.FormatUtils;
import org.locationtech.jts.geom.Geometry;
import org.apache.flink.types.Row;
DataStream<Row> geomStream = text.map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception
{
FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
return Row.of(formatUtils.readGeometry(value), "myName");
}
})
Get Spatial Table¶
Use TableEnv's fromDataStream function, with two column names geom
and geom_name
.
Table geomTable = sedona.fromDataStream(geomStream, "geom", "geom_name")