Spatial SQL app (PyFlink)
To set up the PyFlink with Apache Sedona, please follow the guide. PyFlink When you finish it, you can run the following code to test if everything works.
from sedona.flink import SedonaContext
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
stream_env = StreamExecutionEnvironment.get_execution_environment()
flink_settings = EnvironmentSettings.in_streaming_mode()
table_env = SedonaContext.create(stream_env, flink_settings)
table_env.\
sql_query("SELECT ST_Point(1.0, 2.0)").\
execute()
PyFlink does not expose the possibility of transforming Scala's own user-defined types (UDT) to Python UDT.
So, when you want to collect the result in Python, you need to use functions
like ST_AsText
or ST_ASBinary
to convert the result to a string or binary.
from shapely.wkb import loads
table_env.\
sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))").\
execute().\
collect()
[loads(bytes(el[0])) for el in result]
[<POINT (1 2)>]
Similar with User Defined Scalar functions
from pyflink.table.udf import ScalarFunction, udf
from shapely.wkb import loads
class Buffer(ScalarFunction):
def eval(self, s):
geom = loads(s)
return geom.buffer(1).wkb
table_env.create_temporary_function(
"ST_BufferPython", udf(Buffer(), result_type="Binary")
)
buffer_table = table_env.sql_query(
"SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
)
For more SQL examples please follow the FlinkSQL section FlinkSQL.