28. Distributed Computing with Apache Sedona#
28.1. Introduction#
28.2. Learning Objectives#
28.3. Installing and Setting Up Apache Sedona#
28.3.1. Installation Requirements#
docker run -it -p 8888:8888 -p 4040:4040 -p 8080:8080 -p 8082:8081 -p 7077:7077 -p 8085:8085 -v $(pwd):/app/workspace giswqs/pygis:sedona
28.3.2. Core Imports and Configuration#
import leafmap
import geopandas as gpd
import pandas as pd
from sedona.spark import *
from pyspark.sql.functions import col, expr
28.3.3. Creating a Sedona-Enabled Spark Session#
config = (
SedonaContext.builder()
.appName("SedonaApp") # Application name for tracking in Spark UI
.config(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
) # Faster serialization
.config(
"spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator"
) # Spatial object serialization
.config(
"spark.jars.packages",
"org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.2,org.datasyslab:geotools-wrapper:1.7.2-28.5",
) # Core Sedona packages
.config(
"spark.jars.repositories",
"https://artifacts.unidata.ucar.edu/repository/unidata-all",
) # Additional repositories
.config(
"spark.hadoop.fs.s3a.connection.ssl.enabled", "true"
) # Enable SSL for S3 connections
.config(
"spark.hadoop.fs.s3a.path.style.access", "true"
) # Use path-style access for S3
.config(
"spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
) # S3 filesystem implementation
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
) # Anonymous access to public S3 buckets
.getOrCreate()
)
# Create the Sedona context which adds spatial capabilities to the Spark session
sedona = SedonaContext.create(config)
28.4. Downloading Sample Data#
url = (
"https://github.com/opengeos/datasets/releases/download/book/sedona-sample-data.zip"
)
leafmap.download_file(url)
28.5. Core Concepts and Data Structures#
28.5.1. Understanding Spatial DataFrames#
28.5.2. Spatial Data Types#
28.5.3. Creating Spatial DataFrames#
# Create sample city data with coordinates for major US cities
cities_data = [
("New York", 40.7128, -74.0060),
("Los Angeles", 34.0522, -118.2437),
("Chicago", 41.8781, -87.6298),
("Houston", 29.7604, -95.3698),
("Phoenix", 33.4484, -112.0740),
]
# Convert to Spark DataFrame - this creates a distributed dataset
cities_df = sedona.createDataFrame(
data=cities_data, schema=["city", "latitude", "longitude"]
)
# Create geometry column using ST_Point function - this transforms coordinates into spatial objects
cities_spatial = cities_df.withColumn("geometry", expr("ST_Point(longitude, latitude)"))
# Display the resulting spatial DataFrame
cities_spatial.show(truncate=False)
28.5.4. Working with Real Geospatial Data#
# Load NYC boroughs GeoJSON using GeoPandas for initial processing
boroughs_gdf = gpd.read_file("nybb.geojson")
# Reproject from New York State Plane (EPSG:2263) to Albers Equal Area (EPSG:5070)
# This ensures accurate area calculations for our later analysis
boroughs_gdf = boroughs_gdf.to_crs("EPSG:5070")
boroughs_gdf
# Convert to Pandas DataFrame and extract WKT (Well-Known Text) representation of geometries
boroughs_pd = pd.DataFrame(boroughs_gdf.drop(columns="geometry"))
boroughs_pd["wkt_geometry"] = boroughs_gdf.geometry.to_wkt()
# Create Spark DataFrame from the Pandas DataFrame
boroughs_df = sedona.createDataFrame(boroughs_pd)
# Convert WKT strings back to spatial geometry objects using Sedona's ST_GeomFromWKT function
boroughs_spatial = boroughs_df.withColumn(
"geometry", expr("ST_GeomFromWKT(wkt_geometry)")
)
# Display the key columns of our spatial DataFrame
boroughs_spatial.select("BoroCode", "BoroName", "geometry").show()
28.6. Spatial Operations and Functions#
28.6.1. Basic Geometric Properties#
# Calculate geometric properties for each borough
boroughs_with_metrics = (
boroughs_spatial.withColumn(
"area_km2", expr("ROUND(ST_Area(geometry) / 1e6, 2)")
) # Convert from m² to km²
.withColumn(
"perimeter_km", expr("ROUND(ST_Perimeter(geometry) / 1000, 2)")
) # Convert from m to km
.withColumn("centroid", expr("ST_Centroid(geometry)")) # Calculate geometric center
)
# Display the calculated metrics
boroughs_with_metrics.select("BoroName", "area_km2", "perimeter_km", "centroid").show()
28.6.2. Distance Calculations#
# Transform city coordinates from geographic (EPSG:4326) to projected (EPSG:5070) coordinate system
# This is essential for accurate distance calculations in linear units (meters)
cities_projected = cities_spatial.withColumn(
"geometry", expr("ST_Transform(geometry, 'EPSG:4326', 'EPSG:5070')")
)
# View the transformed coordinates
cities_projected.show(truncate=False)
# Calculate distances from each city to Manhattan's centroid
# First, extract Manhattan's centroid as a reference point
manhattan_centroid = (
boroughs_spatial.filter(col("BoroName") == "Manhattan")
.select(expr("ST_Centroid(geometry)").alias("manhattan_center"))
.collect()[0][0] # collect() brings the result to the driver node
)
# Create a new DataFrame with distance calculations
cities_with_distances = (
cities_projected.withColumn(
"manhattan_center",
expr(
f"ST_GeomFromWKT('{manhattan_centroid.wkt}')"
), # Add Manhattan center as reference
)
.withColumn(
"distance_to_manhattan_meters",
expr("ST_Distance(geometry, manhattan_center)"), # Calculate distance in meters
)
.withColumn(
"distance_to_manhattan_km",
expr(
"ROUND(ST_Distance(geometry, manhattan_center) / 1000, 2)"
), # Convert to kilometers and round
)
)
# Display results ordered by distance (closest first)
cities_with_distances.select("city", "distance_to_manhattan_km").orderBy(
"distance_to_manhattan_km"
).show()
28.6.3. Spatial Relationships#
# Create a 5-kilometer buffer around Manhattan's boundary
manhattan_with_buffer = boroughs_spatial.filter(
col("BoroName") == "Manhattan"
).withColumn(
"buffer_5km", expr("ST_Buffer(geometry, 5000)") # 5000 meters = 5km buffer
)
# Perform spatial join to find cities within the buffer zone
cities_in_buffer = (
cities_projected.crossJoin(
manhattan_with_buffer.select("buffer_5km")
) # Cross join to compare all cities with the buffer
.withColumn(
"within_buffer", expr("ST_Within(geometry, buffer_5km)")
) # Test if city point is within buffer
.filter(col("within_buffer") == True) # Keep only cities that are within the buffer
)
# Display cities that fall within the 5km buffer of Manhattan
cities_in_buffer.select("city").show()
28.7. Spatial Joins and Indexing#
28.7.1. Understanding Spatial Join Types#
28.7.2. Performing Spatial Joins with World Cities#
world_cities_pd = pd.read_csv("world_cities.csv")
world_cities_pd.head()
# Convert Pandas DataFrame to Spark DataFrame for distributed processing
world_cities_df = sedona.createDataFrame(world_cities_pd)
# Create spatial point geometries from longitude and latitude columns
world_cities_spatial = world_cities_df.withColumn(
"geometry", expr("ST_Point(longitude, latitude)")
)
# Display sample data showing the spatial DataFrame structure
world_cities_spatial.show(10)
28.7.3. Spatial Join Example: Cities by Country#
# Create simplified country boundaries for demonstration
# In production, you would load actual country boundary data from sources like Natural Earth
sample_countries = [
(
"Australia",
"POLYGON ((112 -44, 112 -10, 154 -10, 154 -44, 112 -44))",
), # Simplified Australia bbox
(
"UK",
"POLYGON ((-8.65 49.9, -8.65 60.9, 1.75 60.9, 1.75 49.9, -8.65 49.9))",
), # Simplified UK bbox
]
# Create DataFrame with country boundaries
countries_df = sedona.createDataFrame(sample_countries, schema=["country", "wkt"])
countries_spatial = countries_df.withColumn("geometry", expr("ST_GeomFromWKT(wkt)"))
# Perform spatial join to find cities within country boundaries
# This operation tests each city point against each country polygon
cities_in_countries = world_cities_spatial.alias("cities").join(
countries_spatial.alias("countries"),
expr(
"ST_Within(cities.geometry, countries.geometry)"
), # Spatial predicate: city within country
"inner", # Only keep cities that are within a country boundary
)
cities_in_countries.show()
# Aggregate cities by country to get summary statistics
city_counts = cities_in_countries.groupBy("countries.country").count()
city_counts.show()
28.8. Advanced Spatial Analysis#
28.8.1. Spatial Aggregations#
# Perform spatial and statistical aggregations on North American cities
country_unions = (
world_cities_spatial.filter(
col("country").isin(
["USA", "CAN", "MEX"]
) # Filter for North American countries
)
.groupBy("country")
.agg(
expr("ST_Union_Aggr(geometry)").alias(
"union_geometry"
), # Create union of all city points per country
expr("COUNT(*)").alias("city_count"), # Count cities per country
expr("CAST(AVG(population) AS INT)").alias(
"avg_population"
), # Calculate average population per country
)
)
country_unions.show()
28.8.2. Spatial Clustering Analysis#
# Create a 2-degree geographic grid for clustering analysis
grid_analysis = (
world_cities_spatial.filter(
col("population") > 100000
) # Focus on major cities only
.withColumn(
"grid_x", expr("floor(longitude / 2) * 2")
) # Create 2-degree longitude grid cells
.withColumn(
"grid_y", expr("floor(latitude / 2) * 2")
) # Create 2-degree latitude grid cells
.groupBy("grid_x", "grid_y") # Group cities by grid cell
.agg(
expr("COUNT(*)").alias("city_count"), # Count cities in each grid cell
expr("SUM(population)").alias(
"total_population"
), # Sum population in each grid cell
expr("ST_Centroid(ST_Union_Aggr(geometry))").alias(
"grid_center"
), # Calculate center of city cluster
)
)
# Identify and display high-density urban clusters
high_density_grids = grid_analysis.filter(
col("city_count") >= 5
) # Grid cells with 5+ major cities
high_density_grids.orderBy(col("city_count").desc()).show()
28.9. Reading Vector Data#
28.9.1. Reading CSV#
# Read CSV file and create spatial geometries from coordinate columns
cities_df = (
sedona.read.option("header", True) # CSV has header row
.format("CSV")
.load("world_cities.csv")
.withColumn(
"geometry", expr("ST_Point(longitude, latitude)")
) # Create point geometries from coords
)
cities_df.show()
28.9.2. Reading GeoJSON#
# Read GeoJSON file - initial structure shows nested format
cables_df = sedona.read.format("geojson").load("cables.geojson")
cables_df.printSchema()
# Flatten GeoJSON structure for easier analysis
cables_df = (
sedona.read.format("geojson")
.load("cables.geojson")
.selectExpr("explode(features) as features") # Expand the features array
.select("features.*") # Select all fields from features
.withColumn("id", expr("properties['id']")) # Extract property fields
.withColumn("name", expr("properties['name']"))
.withColumn("color", expr("properties['color']"))
.drop("properties") # Remove nested properties object
.drop("type") # Remove type field
)
cables_df.show(5)
28.9.3. Reading Shapefiles#
# Read shapefile directly into spatial DataFrame
countries_df = sedona.read.format("shapefile").load("countries.shp")
countries_df.show()
28.9.4. Reading GeoPackage#
# Read specific table from GeoPackage file
countries_df = (
sedona.read.format("geopackage")
.option("tableName", "countries") # Specify the table/layer name
.load("countries.gpkg")
)
countries_df.show()
28.9.5. Reading GeoParquet#
# Read GeoParquet files - optimized for large datasets
states_df = sedona.read.format("geoparquet").load("us_states.parquet")
states_df.show()
# GeoParquet is excellent for point datasets like cities
us_cities_df = sedona.read.format("geoparquet").load("us_cities.parquet")
us_cities_df.show()
28.9.6. Reading Cloud-Stored Data#
# Read individual state wetlands data from S3 (DC wetlands - smaller dataset for demonstration)
url = "s3a://us-west-2.opendata.source.coop/giswqs/nwi/wetlands/DC_Wetlands.parquet"
df = sedona.read.format("parquet").load(url)
# Display basic information about the dataset
df.show()
# Convert WKB geometry to readable WKT format for inspection
df_wkt = df.withColumn("geometry", expr("ST_AsText(ST_GeomFromWKB(geometry))"))
df_wkt.show()
# Read all state wetlands data using wildcard pattern (75.8 GB total)
url = "s3a://us-west-2.opendata.source.coop/giswqs/nwi/wetlands/*.parquet"
df = sedona.read.format("parquet").load(url)
# Count total wetland features across all states
# This operation may take several minutes due to the dataset size
df.count()
28.10. Visualizing Vector Data#
28.10.1. Using KeplerGL#
# Configure the initial map view for global data
config = {
"mapState": {
"latitude": 20, # Center latitude
"longitude": 0, # Center longitude (prime meridian)
"zoom": 1, # Global zoom level
"pitch": 0, # Map tilt angle
"bearing": 0, # Map rotation
},
}
# Create initial map with cities data
m = SedonaKepler.create_map(cities_df, name="Cities", config=config)
# Add additional layer to the existing map
SedonaKepler.add_df(m, countries_df, name="Countries")
# Display the interactive map
m
28.10.2. Using PyDeck#
# Create 3D geometry visualization for country boundaries
m = SedonaPyDeck.create_geometry_map(countries_df)
m
# Create scatterplot visualization for cities with custom point size
m = SedonaPyDeck.create_scatterplot_map(cities_df, radius_min_pixels=3)
m
28.11. Writing Vector Data#
# Load the wetlands data for processing
url = "s3a://us-west-2.opendata.source.coop/giswqs/nwi/wetlands/DC_Wetlands.parquet"
df = sedona.read.format("parquet").load(url)
df.show()
# Convert WKB geometry to Sedona geometry objects for spatial operations
df = df.withColumn("geometry", expr("ST_GeomFromWKB(geometry)"))
# Write to GeoParquet format with optimized partitioning
# Repartitioning to 10 partitions creates 10 output files for better performance
df.repartition(10).write.format("geoparquet").mode("overwrite").save("DE_Wetlands")
# Write to GeoJSON format for compatibility with web applications
# GeoJSON is widely supported but less efficient for large datasets
df.write.format("geojson").mode("overwrite").save("DE_Wetlands_GeoJSON")
28.12. Reading Raster Data#
28.12.1. Reading NetCDF#
# Load NetCDF file as binary data
df = sedona.read.format("binaryFile").load("wind_global.nc")
# Extract metadata information to understand the file structure
record_info_df = df.selectExpr("RS_NetCDFInfo(content) as record_info")
# Display metadata about variables, dimensions, and attributes
record_info = record_info_df.first()["record_info"]
print(record_info)
# Extract the u_wind variable as a raster using lon/lat coordinates
df = df.withColumn("raster", expr("RS_FromNetCDF(content, 'u_wind', 'lon', 'lat')"))
df.show()
28.12.2. Reading GeoTIFF#
dem_df = sedona.read.format("binaryFile").load("dem_90m.tif")
dem_df = dem_df.withColumn("raster", expr("RS_FromGeoTiff(content)"))
dem_df.show()
# Create a temporary view to enable SQL operations on the raster data
dem_df.createOrReplaceTempView("dem_df")
# Tile the raster into 256x256 pixel chunks for distributed processing
tiles_df = dem_df.selectExpr("RS_TileExplode(raster, 256, 256) as (x, y, raster)")
# Display the tiled dataset - each row represents one tile
tiles_df.show()
28.13. Visualizing Raster Data#
# Convert raster data to image format for visualization
htmlDF = dem_df.selectExpr("RS_AsImage(raster, 1000) as raster_image")
SedonaUtils.display_image(htmlDF)
28.14. Raster Map Algebra#
dem_df.printSchema()
dem_df.createOrReplaceTempView("dem_df")
# Create binary mask for areas above 2000m elevation using map algebra
mountain = sedona.sql(
"""
SELECT
RS_MapAlgebra(
raster,
'D',
'out = (rast[0] > 2000) ? 1 : 0;'
) AS mountain
FROM dem_df
"""
)
# Display the results of the map algebra operation
mountain.show()
# Visualize the binary mountain mask
htmlDF = mountain.selectExpr("RS_AsImage(mountain, 1000) as raster_image")
SedonaUtils.display_image(htmlDF)
# Save the mountain classification raster as a GeoTIFF file
mountain.withColumn("raster_binary", expr("RS_AsGeoTiff(mountain)")).write.format(
"raster"
).option("rasterField", "raster_binary").option("fileExtension", ".tiff").mode(
"overwrite"
).save(
"mountain"
)
28.15. Raster Zonal Statistics#
# Display the elevation raster structure for reference
dem_df.show()
# Define a polygon representing a study area in the Sierra Nevada mountains
polygon = "POLYGON((-119.715885 37.995326, -118.452104 37.995326, -118.452104 37.373679, -119.715885 37.373679, -119.715885 37.995326))"
# Calculate average elevation within the study area polygon
mount_elev = sedona.sql(
f"""
select
RS_ZonalStats(dem_df.raster, ST_Transform(
ST_GeomFromText('{polygon}'), 'EPSG:4326', 'EPSG:3857'
), 1, 'avg', false, false) as avg_elev
from dem_df
"""
)
# Display the calculated average elevation
mount_elev.show()
28.16. Writing Raster Data#
# Write the digital elevation model to GeoTIFF with compression
dem_df.withColumn(
"raster_binary", expr("RS_AsGeoTiff(raster, 'LZW', '0.75')")
).write.format("raster").option("rasterField", "raster_binary").option(
"pathField", "path"
).option(
"fileExtension", ".tiff"
).mode(
"overwrite"
).save(
"dem"
)
# Load NetCDF file containing global wind data
netcdf_df = sedona.read.format("binaryFile").load("wind_global.nc")
# Extract and display metadata to understand the data structure
record_info_df = netcdf_df.selectExpr("RS_NetCDFInfo(content) as record_info")
# Display metadata information
record_info = record_info_df.first()["record_info"]
print(record_info)
# Extract the u_wind component as a raster for analysis
netcdf_df = netcdf_df.withColumn(
"raster", expr("RS_FromNetCDF(content, 'u_wind', 'lon', 'lat')")
)
netcdf_df.show()
28.17. Integration with GeoPandas#
28.17.1. From Sedona DataFrame to GeoPandas GeoDataFrame#
# Load and create spatial data in Sedona format
cities_df = (
sedona.read.option("header", True)
.format("CSV")
.load("world_cities.csv")
.withColumn("geometry", expr("ST_Point(longitude, latitude)"))
)
cities_df.show()
# Convert Sedona DataFrame to GeoPandas GeoDataFrame
df = cities_df.toPandas() # Convert to Pandas DataFrame
gdf = gpd.GeoDataFrame(df, geometry="geometry")
gdf.crs = "EPSG:4326" # Set coordinate reference system
gdf.explore()
28.17.2. From Sedona DataFrame To GeoArrow#
# Convert Sedona DataFrame to Apache Arrow format for efficient interchange
arrow = dataframe_to_arrow(cities_df)
arrow
28.17.3. From GeoPandas GeoDataFrame to Sedona DataFrame#
# Load geospatial data using GeoPandas
gdf = gpd.read_file("world_cities.geojson")
gdf.head()
# Check the coordinate reference system
gdf.crs
# Convert GeoPandas GeoDataFrame directly to Sedona DataFrame
# Sedona automatically handles the geometry conversion
df = sedona.createDataFrame(gdf)
df.show()
28.17.4. Advanced Integration: Converting Through GeoArrow#
# Convert Sedona DataFrame to Arrow format
arrow = dataframe_to_arrow(df)
# Convert Arrow format back to GeoPandas
gdf = gpd.GeoDataFrame.from_arrow(arrow)
# Display the reconstructed GeoPandas DataFrame
gdf.head()
# Convert back to Sedona for further distributed processing
df = sedona.createDataFrame(gdf)
df.show()
28.17.5. Custom Conversion Functions#
# Custom function to convert Sedona DataFrame to GeoPandas with error handling
def sedona_to_geopandas(sedona_df, geometry_col="geometry"):
"""Convert Sedona DataFrame to GeoPandas DataFrame with robust error handling"""
# Convert spatial geometries to WKT format for transfer
with_wkt = sedona_df.withColumn("wkt_geom", expr(f"ST_AsText({geometry_col})"))
# Convert to Pandas DataFrame
pandas_df = with_wkt.toPandas()
# Convert WKT strings to Shapely geometries
from shapely import wkt
pandas_df["geometry"] = pandas_df["wkt_geom"].apply(wkt.loads)
# Create GeoPandas GeoDataFrame
gdf = gpd.GeoDataFrame(pandas_df.drop("wkt_geom", axis=1), geometry="geometry")
return gdf
# Example usage: convert filtered dataset to GeoPandas
major_cities = world_cities_spatial.filter(col("population") > 1000000)
major_cities_gdf = sedona_to_geopandas(major_cities)
print(f"Converted {len(major_cities_gdf)} major cities to GeoPandas")
# Display the converted major cities dataset
major_cities_gdf
28.18. Real-World Use Cases#
28.18.1. Use Case 1: Global Urban Density Analysis#
# Analyze urban density patterns for cities with population > 500,000
urban_analysis = (
world_cities_spatial.filter(col("population") > 500000) # Focus on major cities
.withColumn(
"urban_density",
col("population")
/ expr("ST_Area(ST_Buffer(geometry, 10000))"), # People per m² in 10km radius
)
.withColumn(
"city_category",
expr(
"""
CASE
WHEN population > 5000000 THEN 'Megacity' -- Cities > 5M people
WHEN population > 1000000 THEN 'Large City' -- Cities 1-5M people
ELSE 'Medium City' -- Cities 500K-1M people
END
"""
),
)
)
# Calculate average density by city category
urban_analysis.groupBy("city_category").agg(
expr("count(*)").alias("count"), # Number of cities in each category
expr("avg(urban_density)").alias("avg_density"), # Average density per category
).show()
28.18.2. Use Case 2: Transit Accessibility Assessment#
# Analyze potential transit coverage for cities with population > 100,000
transit_analysis = (
world_cities_spatial.filter(col("population") > 100000)
.withColumn(
"transit_coverage",
expr("ST_Buffer(geometry, 1000)"), # 1km radius transit catchment area
)
.withColumn(
"coverage_area_km2",
expr("ST_Area(ST_Buffer(geometry, 1000)) / 1000000"), # Convert m² to km²
)
)
# Calculate global transit accessibility metrics
accessibility_metrics = transit_analysis.agg(
expr("avg(coverage_area_km2)").alias(
"avg_coverage_area"
), # Average coverage area per city
expr("sum(coverage_area_km2)").alias(
"total_coverage_area"
), # Total potential coverage globally
)
accessibility_metrics.show()