Understand Consumer Behavior With Verified Foot Traffic Data¶
Background Visitor and demographic aggregation data provide essential context on population behavior. How often a place of interest is visited, how long do visitors stay, where did they come from, and where are they going? The answers are invaluable in numerous industries. Building financial indicators, city and urban planning, public health indicators, or identifying your primary business competitors all require accurate, high quality population and POI data.
Objective Our workshop’s objective is to provide professionals, researchers, and practitioners interested in deriving human movement patterns from location data. We use a sample of our Weekly and Monthly Patterns and Core Places products to perform market research on a potential new coffee shop location. We’ll address these concerns and more in building a market analysis proposal in real time.
Questions to Answer
- How far are customers willing to travel for coffee?
- What location will receive the most visibility?
- Where do most of the coffee customers come from?
Notebook Setup¶
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
spark = (
SparkSession.builder.appName("sigspatial2021")
.master("spark://data-ocean-lab-1:7077")
.config("spark.serializer", KryoSerializer.getName)
.config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
.config(
"spark.jars.packages",
"org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,"
"org.datasyslab:geotools-wrapper:1.1.0-25.2",
)
.getOrCreate()
)
WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/media/hdd1/code/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/media/hdd1/code/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/jiayu/.ivy2/cache The jars for the packages stored in: /home/jiayu/.ivy2/jars org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency org.datasyslab#geotools-wrapper added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-f959fc69-d6af-4223-b017-6334535799e9;1.0 confs: [default] found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.2.0-incubating in central found org.locationtech.jts#jts-core;1.18.0 in local-m2-cache found org.wololo#jts2geojson;0.16.1 in central found com.fasterxml.jackson.core#jackson-databind;2.12.2 in central found com.fasterxml.jackson.core#jackson-annotations;2.12.2 in central found com.fasterxml.jackson.core#jackson-core;2.12.2 in central found org.apache.sedona#sedona-core-3.0_2.12;1.2.0-incubating in central found org.scala-lang.modules#scala-collection-compat_2.12;2.5.0 in central found org.apache.sedona#sedona-sql-3.0_2.12;1.2.0-incubating in central found org.datasyslab#geotools-wrapper;1.1.0-25.2 in central :: resolution report :: resolve 322ms :: artifacts dl 7ms :: modules in use: com.fasterxml.jackson.core#jackson-annotations;2.12.2 from central in [default] com.fasterxml.jackson.core#jackson-core;2.12.2 from central in [default] com.fasterxml.jackson.core#jackson-databind;2.12.2 from central in [default] org.apache.sedona#sedona-core-3.0_2.12;1.2.0-incubating from central in [default] org.apache.sedona#sedona-python-adapter-3.0_2.12;1.2.0-incubating from central in [default] org.apache.sedona#sedona-sql-3.0_2.12;1.2.0-incubating from central in [default] org.datasyslab#geotools-wrapper;1.1.0-25.2 from central in [default] org.locationtech.jts#jts-core;1.18.0 from local-m2-cache in [default] org.scala-lang.modules#scala-collection-compat_2.12;2.5.0 from central in [default] org.wololo#jts2geojson;0.16.1 from central in [default] :: evicted modules: org.locationtech.jts#jts-core;1.18.1 by [org.locationtech.jts#jts-core;1.18.0] in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 11 | 0 | 0 | 1 || 10 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-f959fc69-d6af-4223-b017-6334535799e9 confs: [default] 0 artifacts copied, 10 already retrieved (0kB/8ms) 2022-05-22 13:07:11,104 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
import pyspark.sql.functions as f
from pyspark.sql.types import MapType, StringType, IntegerType
import pandas as pd
Load in SafeGraph sample data from s3¶
The covers all coffee shops (category_tag.contains("Coffee Shop")
) in Seattle (county_FIPS.isin(["53033","53053","53061"]
), with multiple rows per POI corresponding to monthly foot traffic since the beginning of 2018 (1 month per row per POI).
Columns are SafeGraph Core, Geo, and Patterns pre-joined together with placekey
as the join key.
sample_csv_path = "file:///media/hdd1/code/sigspatial-2021-cafe-analysis/data/seattle_coffee_monthly_patterns/"
sample = (
spark.read.option("header", "true")
.option("escape", '"')
.csv(sample_csv_path)
.withColumn("date_range_start", f.to_date(f.col("date_range_start")))
.withColumn("date_range_end", f.to_date(f.col("date_range_end")))
.withColumn(
"visitor_home_cbgs",
f.from_json("visitor_home_cbgs", schema=MapType(StringType(), IntegerType())),
)
.withColumn("distance_from_home", f.col("distance_from_home"))
)
print("Number of coffe shops patterns: ", sample.count())
print("Number of coffe shops: ", sample.select("placekey").distinct().count())
sample.limit(10).toPandas().head()
Number of coffe shops patterns: 66607
Number of coffe shops: 1679
placekey | safegraph_place_id | parent_placekey | parent_safegraph_place_id | safegraph_brand_ids | location_name | brands | store_id | top_category | sub_category | ... | distance_from_home | median_dwell | bucketed_dwell_times | related_same_day_brand | related_same_month_brand | popularity_by_hour | popularity_by_day | device_type | carrier_name | county_fips | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 226-222@5x3-t3j-ch5 | sg:290a6c59dd3f4b28a1c086a510e40944 | None | None | None | Black Gold Coffee Company | None | None | Restaurants and Other Eating Places | Snack and Nonalcoholic Beverage Bars | ... | 11416 | 20.0 | {"<5":3,"5-10":30,"11-20":12,"21-60":14,"61-12... | {"CENEX":10,"Starbucks":9,"Safeway Pharmacy":9... | {"McDonald's":44,"Walmart":37,"Starbucks":37,"... | [9,9,9,9,9,12,11,9,15,20,15,22,16,12,11,11,8,7... | {"Monday":13,"Tuesday":20,"Wednesday":8,"Thurs... | {"android":25,"ios":14} | {"AT&T":5,"Sprint":4,"T-Mobile":9,"Verizon":14} | 53033 |
1 | 224-222@5x4-49y-8gk | sg:33d6c187ce6e419cbcb09f0b47ed6236 | None | None | None | Bite Box | None | None | Restaurants and Other Eating Places | Snack and Nonalcoholic Beverage Bars | ... | 2737 | 41.0 | {"<5":4,"5-10":28,"11-20":12,"21-60":30,"61-12... | {"Safeway Pharmacy":6,"Ace Hardware":5,"ARCO":... | {"Starbucks":41,"Shell Oil":33,"Safeway Pharma... | [8,10,10,9,8,12,13,19,22,25,30,34,39,32,21,20,... | {"Monday":17,"Tuesday":25,"Wednesday":15,"Thur... | {"android":20,"ios":51} | {"AT&T":14,"Sprint":1,"T-Mobile":14,"Verizon":35} | 53033 |
2 | 228-222@5x2-t6c-qfz | sg:442c1606a0d54d2aa017ea5a89ea8d78 | zzw-223@5x2-t6c-qfz | sg:a6e078c59b154cd59d8ac9e1f6f6311c | SG_BRAND_f116acfe9147494063e58da666d1d57e | Starbucks | Starbucks | 26209-243989 | Restaurants and Other Eating Places | Snack and Nonalcoholic Beverage Bars | ... | 5074 | 9.0 | {"<5":22,"5-10":165,"11-20":47,"21-60":50,"61-... | {"Walmart":7,"Safeway Fuel Station":4,"McDonal... | {"Walmart":55,"McDonald's":45,"Costco":35,"She... | [9,7,6,6,6,10,17,22,26,47,61,51,39,35,39,48,26... | {"Monday":52,"Tuesday":60,"Wednesday":38,"Thur... | {"android":100,"ios":160} | {"AT&T":64,"Sprint":9,"T-Mobile":62,"Verizon":98} | 53061 |
3 | zzw-22b@5x4-4yr-gtv | sg:5b5977ecc8814452bb5630d0b1b56ae1 | None | None | None | Cozy Bubble Tea | None | None | Restaurants and Other Eating Places | Snack and Nonalcoholic Beverage Bars | ... | None | 90.0 | {"<5":0,"5-10":0,"11-20":0,"21-60":1,"61-120":... | {"Hallmark Cards":20,"76":20} | {"Costco Gasoline":60,"Costco":60,"Starbucks":... | [1,1,1,1,1,1,1,1,1,1,1,0,1,0,0,0,0,0,0,1,2,2,2,2] | {"Monday":0,"Tuesday":1,"Wednesday":1,"Thursda... | {"android":4,"ios":4} | {"AT&T":1,"T-Mobile":2,"Verizon":1} | 53033 |
4 | zzw-224@5x4-4bc-d5f | sg:6af014f98e9f471b85b8e3eacb58b2f7 | None | None | SG_BRAND_f116acfe9147494063e58da666d1d57e | Starbucks | Starbucks | 3278-4859 | Restaurants and Other Eating Places | Snack and Nonalcoholic Beverage Bars | ... | 9319 | 45.0 | {"<5":36,"5-10":201,"11-20":119,"21-60":175,"6... | {"Potbelly Sandwich Works":1,"Kung Fu Tea":1,"... | {"Chevron":18,"Costco":15,"Shell Oil":13,"McDo... | [202,186,187,192,187,196,235,254,272,267,259,2... | {"Monday":181,"Tuesday":171,"Wednesday":127,"T... | {"android":136,"ios":289} | {"AT&T":41,"Sprint":2,"T-Mobile":69,"Verizon":91} | 53033 |
5 rows × 51 columns
Exploratory Data Analysis and Visualization¶
Visualize the coffee shops¶
from pyspark.sql.window import Window
import geopandas as gpd
import folium
w = Window().partitionBy("placekey").orderBy(f.col("date_range_start").desc())
cafes_latest = (
sample
# as our data improves, addresses or geocodes for a given location may change over time
# use a window function to keep only the most recent appearance of the given cafe
.withColumn("row_num", f.row_number().over(w)).filter(f.col("row_num") == 1)
# select the columns we need for mapping
.select(
"placekey",
"location_name",
"brands",
"street_address",
"city",
"region",
"postal_code",
"latitude",
"longitude",
"open_hours",
)
)
# create a geopandas geodataframe
cafes_gdf = cafes_latest.toPandas()
cafes_gdf = gpd.GeoDataFrame(
cafes_gdf,
geometry=gpd.points_from_xy(cafes_gdf["longitude"], cafes_gdf["latitude"]),
crs="EPSG:4326",
)
def map_cafes(gdf):
# map bounds
sw = [gdf.unary_union.bounds[1], gdf.unary_union.bounds[0]]
ne = [gdf.unary_union.bounds[3], gdf.unary_union.bounds[2]]
folium_bounds = [sw, ne]
# map
x = gdf.centroid.x[0]
y = gdf.centroid.y[0]
map_ = folium.Map(location=[y, x], tiles="OpenStreetMap")
for i, point in gdf.iterrows():
tooltip = f"placekey: {point['placekey']}<br>location_name: {point['location_name']}<br>brands: {point['brands']}<br>street_address: {point['street_address']}<br>city: {point['city']}<br>region: {point['region']}<br>postal_code: {point['postal_code']}<br>open_hours: {point['open_hours']}"
folium.Circle(
[point["geometry"].y, point["geometry"].x],
radius=40,
fill_color="blue",
color="blue",
fill_opacity=1,
tooltip=tooltip,
).add_to(map_)
map_.fit_bounds(folium_bounds)
return map_
location_name: {point['location_name']}
brands: {point['brands']}
street_address: {point['street_address']}
city: {point['city']}
region: {point['region']}
postal_code: {point['postal_code']}
open_hours: {point['open_hours']}" folium.Circle( [point["geometry"].y, point["geometry"].x], radius=40, fill_color="blue", color="blue", fill_opacity=1, tooltip=tooltip, ).add_to(map_) map_.fit_bounds(folium_bounds) return map_
map_ = map_cafes(cafes_gdf)
map_
/tmp/ipykernel_2685102/3619001546.py:9: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. x = gdf.centroid.x[0] /tmp/ipykernel_2685102/3619001546.py:10: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. y = gdf.centroid.y[0]
Analysis¶
How far are people willing to travel for coffee?¶
# the `distance_from_home` column tells us the median distance (as the crow flies), in meters, between the coffee shop and the visitors' homes
# which coffee shop's visitors had the highest average median distance traveled since Jan 2018?
# outlier values in this column distort the histogram
# these outliers are likely due to a combination of (1) coffee shops in downtown areas that receive high numbers of out-of-town visitors and (2) quirks in the underlying GPS data
furthest_traveled = (
sample.groupBy("placekey", "location_name", "street_address")
.agg(f.mean("distance_from_home").alias("avg_median_dist_from_home"))
.orderBy("avg_median_dist_from_home", ascending=False)
)
display(furthest_traveled)
furthest_traveled.filter(f.col("avg_median_dist_from_home").isNotNull()).withColumn(
"avg_median_dist_from_home", f.col("avg_median_dist_from_home") / 1000.0
).show(truncate=False)
furthest_traveled = furthest_traveled.drop("street_address")
DataFrame[placekey: string, location_name: string, street_address: string, avg_median_dist_from_home: double]
[Stage 332:=> (1 + 39) / 40] [Stage 332:======================> (17 + 23) / 40]
+-------------------+--------------------------+-----------------------------------+-------------------------+ |placekey |location_name |street_address |avg_median_dist_from_home| +-------------------+--------------------------+-----------------------------------+-------------------------+ |zzy-222@5x4-4b4-p7q|Starbucks |1700 Seventh Ave Nordstrom Building|1124.5566666666668 | |25x-222@5x4-4b5-dvz|Storyville Coffee Company |94 Pike St Ste 34 |757.5968378378378 | |zzw-22f@5x4-4b5-p7q|Starbucks |1912 Pike Pl |750.9135581395349 | |223-222@5x4-4b5-dvz|Ghost Alley Espresso |1499 Post Aly |727.4028888888889 | |224-224@5x4-4b4-xt9|The Bar |110 6th Ave N |667.3632222222222 | |zzw-222@5x4-4b5-pvz|Cafe Opla |2200 Alaskan Way Ste 120 |661.2868000000001 | |22c-222@5x4-4b4-s5z|Starbucks |1124 Pike St |654.5950666666666 | |222-222@5x4-4b5-dvz|Starbucks |102 Pike St |633.7193111111111 | |22q-222@5x4-4b5-m8v|Limoncello Belltown |2326 1st Ave |485.0474516129032 | |22j-222@5x4-4b5-dvz|The Crumpet Shop |1503 1st Ave |431.5123333333333 | |zzw-22c@5x4-4b7-bkz|Starbucks |305 Harrison St Ste 220 |406.287 | |zzy-222@5xd-hjq-yn5|Starbucks |1300 Station Drive |369.72424137931034 | |zzw-22b@5x4-4b5-pgk|World Spice Merchants |1509 Western Ave |360.2277209302325 | |zzw-22f@5x4-4b5-pgk|Procopio Gelateria |1501 Western Ave Ste 300 |344.466 | |223-222@5x4-4b5-hqz|Starbucks |1101 Alaskan Way Ste 102 |317.55111111111114 | |zzw-22r@5x4-4b5-gkz|Cafe ABoDegas |1303 6th Ave |316.76696000000004 | |22g-222@5x4-4b5-dsq|Bottega Italiana |1425 1st Ave |308.5182222222222 | |zzw-22d@5x4-4b7-bkz|Eltana |305 Harrison St |294.919 | |zzw-224@5x4-4b5-f75|MarketSpice |85A Pike St |263.49886363636364 | |22b-222@5x4-4vs-qzz|Cherry Street Coffee House|700 1st Ave |210.19386666666668 | +-------------------+--------------------------+-----------------------------------+-------------------------+ only showing top 20 rows
# most coffee shops' visitors' homes are <10km away
display(furthest_traveled.filter(f.col("avg_median_dist_from_home") < 10000))
print(
furthest_traveled.filter(f.col("avg_median_dist_from_home") < 10000).count(),
" coffee shops have vistors' home <10 km away, out of ",
furthest_traveled.count(),
"coffee shops.",
)
DataFrame[placekey: string, location_name: string, avg_median_dist_from_home: double]
[Stage 337:========================================> (30 + 10) / 40]
1071 coffee shops have vistors' home <10 km away, out of 1688 coffee shops.
# but `distance_from_home` takes into account ALL visitors to the coffee shop, which as we saw above can distort values. When selecting a site for a coffee shop, we likely care more about how far visitors traveled from within Seattle
# we can compute this using Sedona
import sedona
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
SedonaRegistrator.registerAll(spark)
# load the census block groups for Washington State
# filter it down to our three counties of interest in Seattle
WA_cbgs = (
spark.read.option("header", "true")
.option("escape", '"')
.csv("file:///media/hdd1/code/sigspatial-2021-cafe-analysis/data/wa_cbg.csv")
.filter(f.col("GEOID").rlike("^(53033|53053|53061)"))
)
WA_cbgs.head()
Row(GEOID='530330033001', geometry='POLYGON ((-122.376771 47.679527, -122.376172 47.67953199999999, -122.375716 47.679533, -122.375137 47.679534, -122.374659 47.679531, -122.374092 47.679536, -122.373605 47.679537, -122.373003 47.679538, -122.37255 47.679539, -122.371964 47.679541, -122.371494 47.679539, -122.370899 47.679543, -122.370439 47.679543, -122.369828 47.679544, -122.369383 47.679545, -122.368747 47.67954599999999, -122.368328 47.679547, -122.367673 47.679548, -122.367273 47.679548, -122.366609 47.67954899999999, -122.366038 47.679551, -122.365979 47.679399, -122.365958 47.677762, -122.365957 47.676293, -122.36602 47.67616899999999, -122.366025 47.675984, -122.366197 47.675983, -122.36656 47.675982, -122.367624 47.67598, -122.368688 47.67598599999999, -122.36874 47.67598599999999, -122.369067 47.675987, -122.369779 47.675991, -122.370845 47.675993, -122.371399 47.675997, -122.371946 47.675999, -122.373158 47.675993, -122.373574 47.675991, -122.37401 47.67599, -122.37558 47.675988, -122.375645 47.675989, -122.375908 47.675992, -122.376117 47.675992, -122.376277 47.675993, -122.376513 47.675996, -122.376652 47.676203, -122.376708 47.67631799999999, -122.376749 47.676443, -122.376767 47.676553, -122.376765 47.677709, -122.376765 47.6778, -122.376771 47.679527))')
# transform the geometry column into a Geometry-type
WA_cbgs = (
WA_cbgs.withColumn("cbg_geometry", f.expr("ST_GeomFromWkt(geometry)"))
# we'll just use the CBG centroid
.withColumn("cbg_geometry", f.expr("ST_Centroid(cbg_geometry)"))
# since we'll be doing a distance calculation, let's also use a projected CRS - epsg:3857
.withColumn(
"cbg_geometry",
f.expr(
"ST_Transform(ST_FlipCoordinates(cbg_geometry), 'epsg:4326','epsg:3857', false)"
),
) # ST_FlipCoordinates() necessary due to this bug: https://issues.apache.org/jira/browse/SEDONA-39
.withColumnRenamed("GEOID", "cbg")
.withColumnRenamed("geometry", "cbg_polygon_geometry")
)
WA_cbgs.limit(10).toPandas().head()
cbg | cbg_polygon_geometry | cbg_geometry | |
---|---|---|---|
0 | 530330033001 | POLYGON ((-122.376771 47.679527, -122.376172 4... | POINT (-13622316.63058369 6053413.257523819) |
1 | 530530729031 | POLYGON ((-122.643503 47.134887, -122.643502 4... | POINT (-13647989.55994223 5963208.486137308) |
2 | 530530730015 | POLYGON ((-122.567775 46.970114, -122.566807 4... | POINT (-13641020.87626943 5941126.596682728) |
3 | 530530730013 | POLYGON ((-122.533868 46.996149, -122.532109 4... | POINT (-13637080.70343767 5938242.363543665) |
4 | 530530730014 | POLYGON ((-122.484146 46.938446, -122.484146 4... | POINT (-13632564.01655458 5936624.370012998) |
# Next let's prep our sample data
sample_seattle_visitors = (
sample.select("placekey", f.explode("visitor_home_cbgs"))
.withColumnRenamed("key", "cbg")
.withColumnRenamed("value", "visitors")
# # filter out CBGs with low visitor counts
.filter(f.col("visitors") > 4)
# filter down to only the visitors from Seattle CBGs
.filter(f.col("cbg").rlike("^(53033|53053|53061)"))
# aggregate up all the visitors over time from each CBG to each Cafe
.groupBy("placekey", "cbg")
.agg(f.sum("visitors").alias("visitors"))
# join back with most up-to-date POI information
.join(cafes_latest.select("placekey", "latitude", "longitude"), "placekey")
# transform geometry column
.withColumn(
"cafe_geometry",
f.expr(
"ST_Point(CAST(longitude AS Decimal(24, 20)), CAST(latitude AS Decimal(24, 20)))"
),
)
.withColumn(
"cafe_geometry",
f.expr(
"ST_Transform(ST_FlipCoordinates(cafe_geometry), 'epsg:4326','epsg:3857', false)"
),
)
# join with CBG geometries
.join(WA_cbgs, "cbg")
)
sample_seattle_visitors.limit(10).toPandas().head()
cbg | placekey | visitors | latitude | longitude | cafe_geometry | cbg_polygon_geometry | cbg_geometry | |
---|---|---|---|---|---|---|---|---|
0 | 530330002006 | zzw-222@5x4-4d5-mrk | 77 | 47.742573 | -122.349879 | POINT (-13619926.22889864 6064134.508228292) | POLYGON ((-122.308696 47.733904, -122.308045 4... | POINT (-13614894.7487456 6061924.520392932) |
1 | 530330002006 | 223-222@5x4-48d-f4v | 5 | 47.701524 | -122.323637 | POINT (-13617004.98282124 6057341.933516146) | POLYGON ((-122.308696 47.733904, -122.308045 4... | POINT (-13614894.7487456 6061924.520392932) |
2 | 530330002006 | zzw-226@5x4-48g-t5f | 5 | 47.680164 | -122.323755 | POINT (-13617018.11852115 6053809.507537933) | POLYGON ((-122.308696 47.733904, -122.308045 4... | POINT (-13614894.7487456 6061924.520392932) |
3 | 530330002006 | 223-222@5x4-4nz-pqf | 5 | 47.71778 | -122.313042 | POINT (-13615825.55281628 6060031.251671663) | POLYGON ((-122.308696 47.733904, -122.308045 4... | POINT (-13614894.7487456 6061924.520392932) |
4 | 530330002006 | 225-222@5x4-4d6-789 | 10 | 47.724882 | -122.343856 | POINT (-13619255.75160559 6061206.437371126) | POLYGON ((-122.308696 47.733904, -122.308045 4... | POINT (-13614894.7487456 6061924.520392932) |
distance_traveled_SEA = (
sample_seattle_visitors
# calculate the distance from home in meters
.withColumn(
"distance_from_home", f.expr("ST_Distance(cafe_geometry, cbg_geometry)")
)
)
distance_traveled_SEA.createOrReplaceTempView("distance_traveled_SEA")
q = """
SELECT *
FROM (
SELECT DISTINCT
placekey,
cbg,
visitors,
distance_from_home,
posexplode(split(repeat(",", visitors), ","))
FROM distance_traveled_SEA
)
WHERE pos > 0
"""
weighted_median_tmp = spark.sql(q)
grp_window = Window.partitionBy("placekey")
median_percentile = f.expr("percentile_approx(distance_from_home, 0.5)")
weighted_median_tmp.limit(10).toPandas().head()
placekey | cbg | visitors | distance_from_home | pos | col | |
---|---|---|---|---|---|---|
0 | zzw-222@5x4-4d5-mrk | 530330002006 | 77 | 5495.437996 | 1 | |
1 | zzw-222@5x4-4d5-mrk | 530330002006 | 77 | 5495.437996 | 2 | |
2 | zzw-222@5x4-4d5-mrk | 530330002006 | 77 | 5495.437996 | 3 | |
3 | zzw-222@5x4-4d5-mrk | 530330002006 | 77 | 5495.437996 | 4 | |
4 | zzw-222@5x4-4d5-mrk | 530330002006 | 77 | 5495.437996 | 5 |
median_dist_traveled_SEA = weighted_median_tmp.groupBy("placekey").agg(
median_percentile.alias("median_dist_traveled_SEA")
)
median_dist_traveled_SEA.filter(f.col("median_dist_traveled_SEA") < 25000).limit(
10
).toPandas().head()
placekey | median_dist_traveled_SEA | |
---|---|---|
0 | zzw-225@5x4-8xs-h3q | 5373.004786 |
1 | zzw-222@5x4-49x-jqf | 1337.200428 |
2 | 224-222@5x4-4mg-9vf | 2707.661156 |
3 | 222-222@5x4-4fr-mrk | 9124.893780 |
4 | zzw-223@5x4-8pj-b49 | 2364.791999 |
total_visits = sample.groupBy("placekey").agg(
f.sum("raw_visit_counts").alias("total_visits"),
f.sum("raw_visitor_counts").alias("total_visitors"),
)
distance_traveled_final = (
furthest_traveled.join(median_dist_traveled_SEA, "placekey")
.join(cafes_latest, ["placekey", "location_name"])
.withColumn(
"distance_traveled_diff",
f.col("avg_median_dist_from_home") - f.col("median_dist_traveled_SEA"),
)
# keep only the cafes with a meaningful sample - at least 1000 visits since 2018
.join(total_visits, "placekey")
.filter(f.col("total_visits") > 1000)
.orderBy("distance_traveled_diff", ascending=False)
)
distance_traveled_final.limit(10).toPandas().head()
placekey | location_name | avg_median_dist_from_home | median_dist_traveled_SEA | brands | street_address | city | region | postal_code | latitude | longitude | open_hours | distance_traveled_diff | total_visits | total_visitors | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | zzw-22f@5x4-4b5-p7q | Starbucks | 750913.558140 | 23663.589737 | Starbucks | 1912 Pike Pl | Seattle | WA | 98101 | 47.6101 | -122.342482 | { "Mon": [["6:30", "20:30"]], "Tue": [["6:30",... | 727249.968402 | 5195.0 | 4938.0 |
1 | 223-222@5x4-4b5-dvz | Ghost Alley Espresso | 727402.888889 | 26369.553927 | None | 1499 Post Aly | Seattle | WA | 98101 | 47.608608 | -122.340546 | { "Mon": [["7:30", "17:30"]], "Tue": [["7:30",... | 701033.334962 | 17286.0 | 16489.0 |
2 | 224-224@5x4-4b4-xt9 | The Bar | 667363.222222 | 290.137631 | None | 110 6th Ave N | Seattle | WA | 98109 | 47.618842 | -122.344648 | None | 667073.084592 | 15564.0 | 7781.0 |
3 | 22c-222@5x4-4b4-s5z | Starbucks | 654595.066667 | 1881.813510 | Starbucks | 1124 Pike St | Seattle | WA | 98101 | 47.613994 | -122.328201 | { "Mon": [["7:00", "21:00"]], "Tue": [["7:00",... | 652713.253157 | 52085.0 | 43177.0 |
4 | zzw-222@5x4-4b5-pvz | Cafe Opla | 661286.800000 | 24168.052053 | None | 2200 Alaskan Way Ste 120 | Seattle | WA | 98121 | 47.611012 | -122.34752 | None | 637118.747947 | 14462.0 | 9118.0 |
distance_traveled_final.select("placekey").distinct().count()
1429
most_tourists = distance_traveled_final.limit(500).withColumn(
"visitor_type", f.lit("tourist")
)
most_locals = (
distance_traveled_final.orderBy("distance_traveled_diff")
.limit(500)
.withColumn("visitor_type", f.lit("local"))
)
visitor_type = most_tourists.unionByName(most_locals)
# create a geopandas geodataframe
visitor_type_gdf = visitor_type.toPandas()
visitor_type_gdf = gpd.GeoDataFrame(
visitor_type_gdf,
geometry=gpd.points_from_xy(
visitor_type_gdf["longitude"], visitor_type_gdf["latitude"]
),
crs="EPSG:4326",
)
def map_cafe_visitor_type(gdf):
# map bounds
sw = [gdf.unary_union.bounds[1], gdf.unary_union.bounds[0]]
ne = [gdf.unary_union.bounds[3], gdf.unary_union.bounds[2]]
folium_bounds = [sw, ne]
# map
x = gdf.centroid.x[0]
y = gdf.centroid.y[0]
map_ = folium.Map(location=[y, x], tiles="OpenStreetMap")
for i, point in gdf.iterrows():
tooltip = f"placekey: {point['placekey']}<br>location_name: {point['location_name']}<br>brands: {point['brands']}<br>street_address: {point['street_address']}<br>city: {point['city']}<br>region: {point['region']}<br>postal_code: {point['postal_code']}<br>visitor_type: {point['visitor_type']}<br>avg_median_dist_from_home: {point['avg_median_dist_from_home']}"
folium.Circle(
[point["geometry"].y, point["geometry"].x],
radius=40,
fill_color="blue" if point["visitor_type"] == "tourist" else "red",
color="blue" if point["visitor_type"] == "tourist" else "red",
fill_opacity=1,
tooltip=tooltip,
).add_to(map_)
map_.fit_bounds(folium_bounds)
return map_
location_name: {point['location_name']}
brands: {point['brands']}
street_address: {point['street_address']}
city: {point['city']}
region: {point['region']}
postal_code: {point['postal_code']}
visitor_type: {point['visitor_type']}
avg_median_dist_from_home: {point['avg_median_dist_from_home']}" folium.Circle( [point["geometry"].y, point["geometry"].x], radius=40, fill_color="blue" if point["visitor_type"] == "tourist" else "red", color="blue" if point["visitor_type"] == "tourist" else "red", fill_opacity=1, tooltip=tooltip, ).add_to(map_) map_.fit_bounds(folium_bounds) return map_
# blue = touristy, red = locals
map_cafe_visitor_type(visitor_type_gdf)
/tmp/ipykernel_2685102/2617676160.py:9: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. x = gdf.centroid.x[0] /tmp/ipykernel_2685102/2617676160.py:10: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. y = gdf.centroid.y[0]
What location will receive the most visibility?¶
In other words, in what neighborhood do coffee shops get the most visits generally?
WA_neighbs = (
spark.read.option("header", "true")
.option("escape", '"')
.csv(
"file:///media/hdd1/code/sigspatial-2021-cafe-analysis/data/seattle_neighborhoods.csv"
)
# transform the geometry column into a Geometry-type
.withColumn("geometry", f.expr("ST_GeomFromWkt(geometry)"))
)
WA_neighbs.createOrReplaceTempView("WA_neighbs")
WA_neighbs.limit(10).toPandas().head()
S_HOOD | L_HOOD | geometry | |
---|---|---|---|
0 | OOO | POLYGON ((-122.2739789529401 47.69522647266365... | |
1 | OOO | POLYGON ((-122.2875597861967 47.64522740482133... | |
2 | OOO | POLYGON ((-122.3952908582123 47.6651350445393,... | |
3 | OOO | POLYGON ((-122.3983207858678 47.66608770690774... | |
4 | OOO | POLYGON ((-122.2885127664106 47.65630022774357... |
cafes_geo = cafes_latest.withColumn(
"cafe_geometry",
f.expr(
"ST_Point(CAST(longitude AS Decimal(24,20)), CAST(latitude AS Decimal(24,20)))"
),
).select("placekey", "cafe_geometry")
cafes_geo.createOrReplaceTempView("cafes_geo")
# perform a spatial join
q = """
SELECT cafes_geo.placekey, WA_neighbs.S_HOOD as neighborhood, WA_neighbs.geometry
FROM WA_neighbs, cafes_geo
WHERE ST_Intersects(WA_neighbs.geometry, cafes_geo.cafe_geometry)
"""
cafe_neighb_join = spark.sql(q)
cafe_neighb_join.limit(10).toPandas().head()
2022-05-22 14:03:39,421 WARN spatialOperator.JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.
placekey | neighborhood | geometry | |
---|---|---|---|
0 | 222-222@5x4-4fr-mrk | Adams | POLYGON ((-122.3763365647225 47.67591769896643... |
1 | zzw-222@5x4-49x-jqf | South Lake Union | POLYGON ((-122.3296783220331 47.63972718752359... |
2 | 223-222@5x4-487-3t9 | Maple Leaf | POLYGON ((-122.3300076901854 47.70863525163715... |
3 | zzw-224@5x4-4vs-jsq | International District | POLYGON ((-122.3202379357072 47.59582723132589... |
4 | 223-222@5x4-4ft-3bk | Sunset Hill | POLYGON ((-122.402107196151 47.69768191273809,... |
# add the visit and visitor counts and aggregate up to the neighborhood
neighborhood_agg = (
cafe_neighb_join.join(total_visits, "placekey")
.groupBy("neighborhood", f.col("geometry").cast("string").alias("geometry"))
.agg(
f.sum("total_visits").alias("total_visits"),
f.sum("total_visitors").alias("total_visitors"),
)
)
neighbs_gdf = neighborhood_agg.toPandas()
neighbs_gdf = gpd.GeoDataFrame(
neighbs_gdf,
geometry=gpd.GeoSeries.from_wkt(neighbs_gdf["geometry"]),
crs="EPSG:4326",
)
2022-05-22 14:03:48,317 WARN spatialOperator.JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.
def map_neighbs(gdf):
# map bounds
sw = [gdf.unary_union.bounds[1], gdf.unary_union.bounds[0]]
ne = [gdf.unary_union.bounds[3], gdf.unary_union.bounds[2]]
folium_bounds = [sw, ne]
# map
x = gdf.centroid.x[0]
y = gdf.centroid.y[0]
map_ = folium.Map(location=[y, x], tiles="OpenStreetMap")
gdf["percentile"] = pd.qcut(gdf["total_visits"], 100, labels=False) / 100
folium.GeoJson(
gdf[
["neighborhood", "total_visits", "total_visitors", "percentile", "geometry"]
],
style_function=lambda x: {
"weight": 0,
"color": "blue",
"fillOpacity": x["properties"]["percentile"],
},
tooltip=folium.features.GeoJsonTooltip(
fields=["neighborhood", "total_visits", "total_visitors", "percentile"]
),
).add_to(map_)
map_.fit_bounds(folium_bounds)
return map_
map_neighbs(neighbs_gdf)
/tmp/ipykernel_2685102/3760141471.py:9: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. x = gdf.centroid.x[0] /tmp/ipykernel_2685102/3760141471.py:10: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. y = gdf.centroid.y[0]
What home location has the most coffee-shop-goers?¶
home_loc_most_cafe_visitors = (
sample.select(f.explode("visitor_home_cbgs"))
.withColumnRenamed("key", "cbg")
.withColumnRenamed("value", "visitors")
.groupBy("cbg")
.agg(f.sum("visitors").alias("visitors"))
.orderBy("visitors", ascending=False)
)
# 40k visitors to Seattle coffee shops since Jan 2018 originated from CBG `530330082001`, just under twice as many as the next CBG.
home_loc_most_cafe_visitors.limit(10).toPandas().head()
cbg | visitors | |
---|---|---|
0 | 530330082001 | 40286 |
1 | 530330326023 | 20529 |
2 | 530330072001 | 19432 |
3 | 530530731251 | 18486 |
4 | 530530702031 | 18131 |
from shapely import wkt
# Map of the top 1000 CBGs in terms of visitors' origins
home_loc_gdf = (
home_loc_most_cafe_visitors.limit(1000)
.join(WA_cbgs.select("cbg", "cbg_polygon_geometry"), "cbg")
.toPandas()
)
home_loc_gdf = gpd.GeoDataFrame(
home_loc_gdf,
geometry=gpd.GeoSeries.from_wkt(home_loc_gdf["cbg_polygon_geometry"]),
crs="EPSG:4326",
)
def map_cbgs(gdf):
# map bounds
sw = [gdf.unary_union.bounds[1], gdf.unary_union.bounds[0]]
ne = [gdf.unary_union.bounds[3], gdf.unary_union.bounds[2]]
folium_bounds = [sw, ne]
# map
x = gdf.centroid.x[0]
y = gdf.centroid.y[0]
map_ = folium.Map(location=[y, x], tiles="OpenStreetMap")
gdf["quantile"] = pd.qcut(gdf["visitors"], 100, labels=False) / 100
folium.GeoJson(
gdf[["cbg", "visitors", "geometry", "quantile"]],
style_function=lambda x: {
"weight": 0,
"color": "blue",
"fillOpacity": x["properties"]["quantile"],
},
tooltip=folium.features.GeoJsonTooltip(fields=["cbg", "visitors", "quantile"]),
).add_to(map_)
map_.fit_bounds(folium_bounds)
return map_
# top 1000 home_cbgs by number of people who visited coffee shops in Seattle
map_cbgs(home_loc_gdf)
/tmp/ipykernel_2685102/4182001321.py:9: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. x = gdf.centroid.x[0] /tmp/ipykernel_2685102/4182001321.py:10: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. y = gdf.centroid.y[0]
Conclusion¶
Apache Sedona provides a familiar and straight-forward spatial SQL API for performing distributed spatial queries.