Apache Sedona 1.9.0 已正式发布,新增 Spark 4.1 支持、proj4sedona 坐标系转换、Bing Tile 函数等众多特性!

空间 SQL 应用(PyFlink)

要在 Apache Sedona 中配置 PyFlink,请先按 PyFlink 指南完成安装。 完成后,可以运行下面的代码以验证环境是否正常工作。

from sedona.flink import SedonaContext
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

stream_env = StreamExecutionEnvironment.get_execution_environment()
flink_settings = EnvironmentSettings.in_streaming_mode()
table_env = SedonaContext.create(stream_env, flink_settings)

table_env.sql_query("SELECT ST_Point(1.0, 2.0)").execute()

PyFlink 不支持把 Scala 自定义类型(UDT)转换为 Python UDT。 因此,如果想在 Python 中收集结果,需要使用 ST_AsTextST_ASBinary 等函数把结果转换为字符串或二进制。

from shapely.wkb import loads

table_env.sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))").execute().collect()

[loads(bytes(el[0])) for el in result]
[<POINT (1 2)>]

用户自定义标量函数(UDF)也是类似的处理方式:

from pyflink.table.udf import ScalarFunction, udf
from shapely.wkb import loads


class Buffer(ScalarFunction):
    def eval(self, s):
        geom = loads(s)
        return geom.buffer(1).wkb


table_env.create_temporary_function(
    "ST_BufferPython", udf(Buffer(), result_type="Binary")
)

buffer_table = table_env.sql_query(
    "SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
)

更多 SQL 示例请参阅 FlinkSQL 章节:FlinkSQL