Skip to contents

Introduction

apache.sedona is a sparklyr-based R interface for Apache Sedona. It presents what Apache Sedona has to offer through idiomatic frameworks and constructs in R (e.g., one can build spatial Spark SQL queries using Sedona UDFs in conjunction with a wide range of dplyr expressions), hence making Apache Sedona highly friendly for R users.

Generally speaking, when working with Apache Sedona, one choose between the following two modes:

While the former option enables more fine-grained control over low-level implementation details (e.g., which index to build for spatial queries, which data structure to use for spatial partitioning, etc), the latter is simpler and leads to a straightforward integration with dplyr, sparklyr, and other sparklyr extensions (e.g., one can build ML feature extractors with Sedona UDFs and connect them with ML pipelines using ml_*() family of functions in sparklyr, hence creating ML workflows capable of understanding spatial data).

Because data from spatial RDDs can be imported into Spark DataFrames as geometry columns and vice versa, one can switch between the abovementioned two modes fairly easily.

At the moment apache.sedona consists of the following components:

  • R interface for Spatial-RDD-related functionalities
    • Reading/writing spatial data in WKT, WKB, and GeoJSON formats
    • Shapefile reader
    • Spatial partition, index, join, KNN query, and range query operations
    • Visualization routines
  • dplyr-integration for Sedona spatial UDTs and UDFs
    • See SQL APIs for the list of available UDFs
  • Functions importing data from spatial RDDs to Spark DataFrames and vice versa

Connect to Spark

To ensure Sedona serialization routines, UDTs, and UDFs are properly registered when creating a Spark session, one simply needs to attach apache.sedona before instantiating a Spark connection. apache.sedona will take care of the rest. For example,

library(sparklyr)
library(apache.sedona)

spark_home <- "/usr/lib/spark"  # NOTE: replace this with your $SPARK_HOME directory
sc <- spark_connect(master = "yarn", spark_home = spark_home)

will create a Sedona-capable Spark connection in YARN client mode, and

will create a Sedona-capable Spark connection to an Apache Spark instance running locally.

In sparklyr, one can easily inspect the Spark connection object to sanity-check it has been properly initialized with all Sedona-related dependencies, e.g.,

print(sc$extensions$packages)

and

spark_session(sc) %>%
  invoke("%>%", list("conf"), list("get", "spark.kryo.registrator")) %>%
  print()

For more information about connecting to Spark with sparklyr, see https://therinspark.com/connections.html and ?sparklyr::spark_connect. Also see Initiate Spark Context and Initiate Spark Session for minimum and recommended dependencies for Apache Sedona.

dplyr workflows

apache.sedona extends sparklyr integrates with dplyr workflows. See the sparklyr cheatsheet

Loading data

Copying from R

Data loaded in R can be copied to Spark using copy_to. Columns containing spatial information can be converted to the geometry type in Spark DataFrames (GeometryUDT) with Spark SQL functions such as ST_GeomFromText or ST_GeomFromText, see Vector constructors and Raster input and output.

data <- readr::read_csv(here::here("../core/src/test/resources/arealm.csv"), col_names = FALSE, show_col_types = FALSE)
data %>% glimpse()

data_tbl <- copy_to(sc, data)

data_tbl

data_tbl %>% 
  transmute(geometry = st_geomfromtext(X1)) %>% 
  sdf_schema()

No automatic translation of sf objects is provided, they need to be converted to text (or binary) format before copying to spark.

data <- sf::st_read(here::here("../core/src/test/resources/testPolygon.json"))

data %>% glimpse()

data_tbl <- 
  copy_to(
    sc, 
    data %>% 
    mutate(geometry_wkb = geometry %>% sf::st_as_text()) %>% 
    sf::st_drop_geometry(),
  name = "data",
  overwrite = TRUE
)

data_tbl %>% 
  transmute(geometry = st_geomfromtext(geometry_wkb)) %>% 
  sdf_schema()

Loading directly in Spark

Loading data in R and then copying it to Spark will most likely not be the optimal method to prepare data for analysis; loading data directly into Spark will often be best.spark_read_* functions are made for this purpose (and extend spark_read_* functions in sparklyr).

data_tbl <- spark_read_geojson(sc, path = here::here("../core/src/test/resources/testPolygon.json"), name = "data")

data_tbl %>% 
  glimpse()

data_tbl %>% 
  # select(geometry) %>% 
  sdf_schema() %>% 
  lobstr::tree()

Manipulating

The dbplyr interface transparently translates dbplyr worklfows into SQL, and gives access to all Apache Sedona SQL functions:

Results are then collected back into R with collect.

## ! ST_FlipCoordinates needs to be called before st_transform as by default, this function uses lat/lon order
data_tbl %>% 
  mutate(
    ALAND = ALAND %>% as.numeric(),
    AWATER = AWATER %>% as.numeric(),
    area = ALAND + AWATER,
    geometry_proj = st_transform(ST_FlipCoordinates(geometry), "epsg:4326", "epsg:5070", TRUE),
    area_geom = st_area(geometry_proj)
    ) %>% 
  select(STATEFP, COUNTYFP, area, area_geom) %>% 
  head() %>% 
  collect()

Geometries need to be converted to a serializable (text or binary) format before collect is called:

## Setting the CRS in R post-collect
data_tbl %>% 
  mutate(
    area = st_area(st_transform(ST_FlipCoordinates(geometry), "epsg:4326", "epsg:5070", TRUE)),
    geometry_wkb = geometry %>% st_asBinary()
    ) %>% 
  select(COUNTYFP, geometry_wkb) %>% 
  head() %>% 
  collect() %>% 
  sf::st_as_sf(crs = 4326)
## Setting the CRS in Spark (and using EWKT to keep it)
data_tbl %>% 
  mutate(
    area = st_area(st_transform(ST_FlipCoordinates(geometry), "epsg:4326", "epsg:5070", TRUE)),
    geometry_ewkt = geometry %>% st_setsrid(4326) %>% st_asewkt()
    ) %>% 
  select(COUNTYFP, geometry_ewkt) %>% 
  head() %>% 
  collect() %>% 
  sf::st_as_sf(wkt = "geometry_ewkt") 

Writing

Collected results can be saved from R. In many cases it will be more efficient to write results directly from Spark. The spark_write_* (see docs) functions are made for this purpose (and extend spark_write_* functions in sparklyr).

dest_file <- tempfile() ## Destination folder
data_tbl %>% 
  filter(str_sub(COUNTYFP, 1, 2) == "00") %>% 
  spark_write_geoparquet(path = dest_file)

dest_file %>% dir(recursive = TRUE)

The output can be partitioned by the columns present in the data:

dest_file <- tempfile()  ## Destination folder
data_tbl %>% 
  filter(str_sub(COUNTYFP, 1, 2) == "00") %>% 
  spark_write_geoparquet(path = dest_file, partition_by = "COUNTYFP")

dest_file %>% dir(recursive = TRUE)

Spark DataFrames

Spark DataFrames provide a higher level API than RDDs, and can be used with SQL queries. sparklyr and apache.sedona automatically wrap Spark DataFrames in dplyr tbls to work in dplyr workflows.

data_tbl %>% class()

You can get to the underlying Spark DataFrame (SDF) with sparklyr::spark_dataframe, to be used for example in with spakrlyr::invoke to call SDF methods within Spark.

sdf <- data_tbl %>% spark_dataframe()
sdf

RDD workflows

What are SpatialRDDs?

SpatialRDDs are basic building blocks of distributed spatial data in Apache Sedona. A SpatialRDD can be partitioned and indexed using well-known spatial data structures to facilitate range queries, KNN queries, and other low-level operations. One can also export records from SpatialRDDs into regular Spark DataFrames, making them accessible through Spark SQL and through the dplyr interface of sparklyr.

Creating a SpatialRDD

NOTE: this section is largely based on Spatial RDD Scala tutorial, except for examples have been written in R instead of Scala to reflect usages of apache.sedona.

Currently SpatialRDDs can be created in apache.sedona by reading a file in a supported geospatial format (sedona_read_* functions), or by extracting data from a Spark SQL query.

For example, the following code will import data from arealm-small.csv into a SpatialRDD:

pt_rdd <- sedona_read_dsv_to_typed_rdd(
  sc,
  location = here::here("../core/src/test/resources/arealm.csv"),
  delimiter = ",",
  type = "point",
  first_spatial_col_index = 1,
  has_non_spatial_attrs = TRUE
)

Records from the example arealm-small.csv file look like the following:

testattribute0,-88.331492,32.324142,testattribute1,testattribute2
testattribute0,-88.175933,32.360763,testattribute1,testattribute2
testattribute0,-88.388954,32.357073,testattribute1,testattribute2

As one can see from the above, each record is comma-separated and consists of a 2-dimensional coordinate starting at the 2nd column and ending at the 3rd column. All other columns contain non-spatial attributes. Because column indexes are 0-based, we need to specify first_spatial_col_index = 1 in the example above to ensure each record is parsed correctly.

In addition to formats such as CSV and TSV, currently apache.sedona also supports reading files in WKT (Well-Known Text), WKB (Well-Known Binary), Shapefile and GeoJSON formats. See sedona_read_wkt() for details.

Conversion to and from SpatialRDD

One can also run to_spatial_rdd() to extract a SpatialRDD from a Spark SQL query, e.g. the query below will extract a spatial column named "geom" from the Sedona spatial SQL query above and store it in a SpatialRDD object.

library(dplyr)

sdf <- tbl(
  sc,
  sql("SELECT ST_GeomFromText('POINT(-71.064544 42.28787)') AS `geom`, \"point\" AS `type`")
)

spatial_rdd <- sdf %>% to_spatial_rdd(spatial_col = "geom")
spatial_rdd

A SpatialRDD can be converted into a Spark DataFrame with sdf_register (generic method from sparklyr).

spatial_sdf <- spatial_rdd %>% sdf_register(name = "my_table")
spatial_sdf

Visualization

An important part of apache.sedona is its collection of R interfaces to Sedona visualization routines. For example, the following is essentially the R equivalent of this example in Scala.

resolution_x <- 1000
resolution_y <- 600
boundary <- c(-126.790180, -64.630926, 24.863836, 50.000)

pt_rdd <- sedona_read_dsv_to_typed_rdd(
  sc,
  location = here::here("../core/src/test/resources/arealm.csv"),
  type = "point"
)
polygon_rdd <- sedona_read_dsv_to_typed_rdd(
  sc,
  location = here::here("../core/src/test/resources/primaryroads-polygon.csv"),
  type = "polygon"
)
pair_rdd <- sedona_spatial_join_count_by_key(
  pt_rdd,
  polygon_rdd,
  join_type = "intersect"
)

overlay <- sedona_render_scatter_plot(
  polygon_rdd,
  resolution_x,
  resolution_y,
  output_location = tempfile("scatter-plot-"),
  boundary = boundary,
  base_color = c(255, 0, 0),
  browse = FALSE
)

sedona_render_choropleth_map(
  pair_rdd,
  resolution_x,
  resolution_y,
  output_location = "./choropleth-map",
  boundary = boundary,
  overlay = overlay,
  # vary the green color channel according to relative magnitudes of data points so
  # that the resulting map will show light blue, light purple, and light gray pixels
  color_of_variation = "green",
  base_color = c(225, 225, 255)
)

It will create a scatter plot, and then overlay it on top of a choropleth map, as shown below:

See ?apache.sedona::sedona_render_scatter_plot, ?apache.sedona::sedona_render_heatmap, and ?apache.sedona::sedona_render_choropleth_map for more details on visualization-related R interfaces currently implemented by apache.sedona.

Advanced parameters

Various advanced parameters can be set in Apache Sedona, see parameters

Currently not working through the sparklyr config:

config <- spark_config()
config[["sedona.global.index"]] <- FALSE

sc <- spark_connect(master = "local", config = config)

Check

invoke_new(sc, "org.apache.sedona.core.utils.SedonaConf", invoke(spark_session(sc), "conf")) 

(Still true)

Or change at runtime:

spark_session(sc) %>% 
  invoke("conf") %>% 
  invoke("set", "sedona.global.index","false")

invoke_new(sc, "org.apache.sedona.core.utils.SedonaConf", invoke(spark_session(sc), "conf")) 
invoke_new(sc, "org.apache.sedona.core.utils.SedonaConf", invoke(spark_session(sc), "conf"))