Example of spark + sedona + hdfs with slave nodes and OSM vector data consults
from IPython.display import display, HTML
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pandas as pd
from pyspark.sql.types import StructType, StructField,StringType, LongType, IntegerType, DoubleType, ArrayType
from pyspark.sql.functions import regexp_replace
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from pyspark.sql.functions import col, split, expr
from pyspark.sql.functions import udf, lit
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from pyspark.sql.functions import col, split, expr
from pyspark.sql.functions import udf, lit, flatten
from pywebhdfs.webhdfs import PyWebHdfsClient
from datetime import date
from pyspark.sql.functions import monotonically_increasing_id
import json
Registering spark session, adding node executor configurations and sedona registrator
spark = SparkSession.\
builder.\
appName("Overpass-API").\
enableHiveSupport().\
master("local[*]").\
master("spark://spark-master:7077").\
config("spark.executor.memory", "15G").\
config("spark.driver.maxResultSize", "135G").\
config("spark.sql.shuffle.partitions", "500").\
config(' spark.sql.adaptive.coalescePartitions.enabled', True).\
config('spark.sql.adaptive.enabled', True).\
config('spark.sql.adaptive.coalescePartitions.initialPartitionNum', 125).\
config("spark.sql.execution.arrow.pyspark.enabled", True).\
config("spark.sql.execution.arrow.fallback.enabled", True).\
config('spark.kryoserializer.buffer.max', 2047).\
config("spark.serializer", KryoSerializer.getName).\
config("spark.kryo.registrator", SedonaKryoRegistrator.getName).\
config("spark.jars.packages", "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.0,org.datasyslab:geotools-wrapper:1.4.0-28.2") .\
enableHiveSupport().\
getOrCreate()
SedonaRegistrator.registerAll(spark)
sc = spark.sparkContext
Connecting to Overpass API to search and downloading data for saving into HDFS
import requests
import json
overpass_url = "http://overpass-api.de/api/interpreter"
overpass_query = """
[out:json];
area[name = "Foz do Iguaçu"];
way(area)["highway"~""];
out geom;
>;
out skel qt;
"""
response = requests.get(overpass_url,
params={'data': overpass_query})
data = response.json()
hdfs = PyWebHdfsClient(host='179.106.229.159',port='50070', user_name='root')
file_name = "foz_roads_osm.json"
hdfs.delete_file_dir(file_name)
hdfs.create_file(file_name, json.dumps(data))
Connecting spark sedona with saved hdfs file
path = "hdfs://776faf4d6a1e:8020/"+file_name
df = spark.read.json(path, multiLine = "true")
Consulting and organizing data for analysis
from pyspark.sql.functions import explode, arrays_zip
df.createOrReplaceTempView("df")
tb = spark.sql("select *, size(elements) total_nodes from df")
tb.show(5)
isolate_total_nodes = tb.select("total_nodes").toPandas()
total_nodes = isolate_total_nodes["total_nodes"].iloc[0]
print(total_nodes)
isolate_ids = tb.select("elements.id").toPandas()
ids = pd.DataFrame(isolate_ids["id"].iloc[0]).drop_duplicates()
print(ids[0].iloc[1])
formatted_df = tb\
.withColumn("id", explode("elements.id"))
formatted_df.show(5)
formatted_df = tb\
.withColumn("new", arrays_zip("elements.id", "elements.geometry", "elements.nodes", "elements.tags"))\
.withColumn("new", explode("new"))
formatted_df.show(5)
# formatted_df.printSchema()
formatted_df = formatted_df.select("new.0","new.1","new.2","new.3.maxspeed","new.3.incline","new.3.surface", "new.3.name", "total_nodes")
formatted_df = formatted_df.withColumnRenamed("0","id").withColumnRenamed("1","geom").withColumnRenamed("2","nodes").withColumnRenamed("3","tags")
formatted_df.createOrReplaceTempView("formatted_df")
formatted_df.show(5)
# TODO atualizar daqui para baixo para considerar a linha inteira na lógica
points_tb = spark.sql("select geom, id from formatted_df where geom IS NOT NULL")
points_tb = points_tb\
.withColumn("new", arrays_zip("geom.lat", "geom.lon"))\
.withColumn("new", explode("new"))
points_tb = points_tb.select("new.0","new.1", "id")
points_tb = points_tb.withColumnRenamed("0","lat").withColumnRenamed("1","lon")
points_tb.printSchema()
points_tb.createOrReplaceTempView("points_tb")
points_tb.show(5)
coordinates_tb = spark.sql("select (select collect_list(CONCAT(p1.lat,',',p1.lon)) from points_tb p1 where p1.id = p2.id group by p1.id) as coordinates, p2.id, p2.maxspeed, p2.incline, p2.surface, p2.name, p2.nodes, p2.total_nodes from formatted_df p2")
coordinates_tb.createOrReplaceTempView("coordinates_tb")
coordinates_tb.show(5)
roads_tb = spark.sql("SELECT ST_LineStringFromText(REPLACE(REPLACE(CAST(coordinates as string),'[',''),']',''), ',') as geom, id, maxspeed, incline, surface, name, nodes, total_nodes FROM coordinates_tb WHERE coordinates IS NOT NULL")
roads_tb.createOrReplaceTempView("roads_tb")
roads_tb.show(5)