🎉 SedonaDB 0.4.0 已正式发布!🗺️ 新增 Python DataFrame API、R dplyr 接口、Geography 支持及 GPU 加速空间连接。阅读发布博客 →

空间 SQL 应用(PyFlink)

要在 Apache Sedona 中配置 PyFlink,请先按 PyFlink 指南完成安装。 完成后,可以运行下面的代码以验证环境是否正常工作。

from sedona.flink import SedonaContext
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

stream_env = StreamExecutionEnvironment.get_execution_environment()
flink_settings = EnvironmentSettings.in_streaming_mode()
table_env = SedonaContext.create(stream_env, flink_settings)

table_env.sql_query("SELECT ST_Point(1.0, 2.0)").execute()

PyFlink 不支持把 Scala 自定义类型(UDT)转换为 Python UDT。 因此,如果想在 Python 中收集结果,需要使用 ST_AsTextST_ASBinary 等函数把结果转换为字符串或二进制。

from shapely.wkb import loads

table_env.sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))").execute().collect()

[loads(bytes(el[0])) for el in result]
[<POINT (1 2)>]

用户自定义标量函数(UDF)也是类似的处理方式:

from pyflink.table.udf import ScalarFunction, udf
from shapely.wkb import loads


class Buffer(ScalarFunction):
    def eval(self, s):
        geom = loads(s)
        return geom.buffer(1).wkb


table_env.create_temporary_function(
    "ST_BufferPython", udf(Buffer(), result_type="Binary")
)

buffer_table = table_env.sql_query(
    "SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
)

更多 SQL 示例请参阅 FlinkSQL 章节:FlinkSQL