Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

EMR Spark Labs — COVID-19 Analytics with PySpark + Iceberg

Seven hands-on labs that build a full COVID-19 analytics pipeline using Apache Spark on Amazon EMR 7.2.0. You will use the PySpark DataFrame API and Spark SQL, ingest raw CSV from the AWS COVID-19 Data Lake, enrich it with hospital capacity data, write optimised Parquet/ORC output, manage an Apache Iceberg ACID table, query via Athena, and schedule the pipeline with Oozie.


Table of Contents

  1. Architecture
  2. Prerequisites
  3. Repository Layout
  4. EMR Configuration
  5. Lab 00 — Data Foundation
  6. Lab 01 — Register Tables
  7. Lab 02 — Catalog Inspection
  8. Lab 03 — Enrichment Join
  9. Lab 04 — Apache Iceberg
  10. Lab 05 — Athena Queries
  11. Lab 06 — Oozie Scheduling
  12. Key Concepts
  13. Troubleshooting

1. Architecture

s3://covid19-lake  (public)
        │
        │  lab-00: sync CSV
        ▼
s3://emr-labs-raw/covid/
        │
        │  lab-01: SparkSession + Glue catalog (external tables)
        ▼
AWS Glue Data Catalog (emr_labs_db)
  ├─ nytimes_counties   ← CSV external table
  └─ hospital_beds      ← CSV external table
        │
        │  lab-03: DataFrame enrichment join
        ▼
s3://emr-labs-intermediate/covid/enriched/
  ├─ parquet/  (snappy, partitioned by state)
  └─ orc/      (zlib, bloom filter on fips_code)
        │
        │  lab-04: Iceberg ACID
        ▼
s3://emr-labs-intermediate/covid/iceberg/
  └─ enriched_covid_iceberg/  (Iceberg v2 table)
        │
   lab-05: Athena    lab-06: Oozie

2. Prerequisites

RequirementNotes
Terraform appliedterraform/envs/dev/ provisioned
AWS CLI ≥ 2.15Profile with EMR + S3 + Glue + Athena permissions
Bash ≥ 4.0brew install bash on macOS
jqFor get_tf_outputs.sh
KEY_PAIR env var (optional)EC2 key pair name for SSH access to master node

Export region once:

export AWS_DEFAULT_REGION=us-east-1

3. Repository Layout

emr-spark-labs/
├── README.md
├── shared/
│   └── configs/
│       └── spark-glue-config.json       ← Spark + Glue EMR classifications
└── labs/
    ├── lab-00-foundation/
    │   └── scripts/copy_public_data.sh
    ├── lab-01-register-tables/
    │   ├── pyspark/01_register_tables.py
    │   └── scripts/run_lab.sh
    ├── lab-02-catalog-inspection/
    │   ├── pyspark/01_inspect_catalog.py
    │   └── scripts/run_lab.sh
    ├── lab-03-enrichment/
    │   ├── pyspark/
    │   │   ├── 01_ctas_enriched.py
    │   │   └── 02_validate_output.py
    │   └── scripts/run_lab.sh
    ├── lab-04-iceberg/
    │   ├── pyspark/01_iceberg_basics.py
    │   └── scripts/run_lab.sh
    ├── lab-05-athena/
    │   ├── sql/
    │   │   ├── 01_create_view.sql
    │   │   ├── 02_top_states.sql
    │   │   └── 03_county_trend.sql
    │   └── scripts/run_lab.sh
    └── lab-06-oozie/
        ├── oozie/
        │   ├── workflow.xml
        │   ├── coordinator.xml
        │   └── job.properties
        └── scripts/
            ├── run_lab.sh
            └── submit_oozie.sh

4. EMR Configuration

shared/configs/spark-glue-config.json

Applied to every cluster via --configurations file://....

Key settings:

ClassificationPropertyValuePurpose
spark-defaultsspark.sql.extensionsIcebergSparkSessionExtensionsEnable Iceberg DDL/DML
spark-defaultsspark.sql.catalog.glue_catalogorg.apache.iceberg.spark.SparkCatalogIceberg catalog alias
spark-defaultsspark.sql.catalog.glue_catalog.catalog-implorg.apache.iceberg.aws.glue.GlueCatalogGlue as Iceberg metastore
spark-defaultsspark.sql.catalogImplementationhiveEnables Hive catalog for external tables
spark-defaultsspark.hadoop.hive.metastore.client.factory.classcom.amazonaws.glue...AWS Glue metastore factory
spark-defaultsspark.dynamicAllocation.enabledtrueAuto-scale executors 1–4
spark-defaultsspark.sql.adaptive.enabledtrueAdaptive query execution
spark-hive-sitehive.metastore.client.factory.classsame factoryHive-compatible access

5. Lab 00 — Data Foundation

Goal: Copy COVID-19 CSV data from the public AWS Data Lake to your private raw S3 bucket.

./labs/lab-00-foundation/scripts/copy_public_data.sh

What it does:

  1. Sources get_tf_outputs.sh to get $RAW_BUCKET
  2. aws s3 sync s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-counties/ s3://$RAW_BUCKET/covid/nytimes/
  3. aws s3 sync s3://covid19-lake/rearc-usa-hospital-beds/csv/ s3://$RAW_BUCKET/covid/hospital-beds/

Output layout:

s3://<raw-bucket>/covid/
├── nytimes/        ← ~3MB CSV, ~400k rows
└── hospital-beds/  ← ~1MB CSV, ~7000 rows

Run time: ~30 seconds (no EMR cluster needed).


6. Lab 01 — Register Tables

Goal: Create Glue-backed external tables and run PySpark analytics.

./labs/lab-01-register-tables/scripts/run_lab.sh

Cluster config: Spark + Hadoop + Hive, emr-7.2.0, m5.xlarge fleet.

What 01_register_tables.py does

spark = SparkSession.builder \
    .appName("Lab01-RegisterTables") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql("""
    CREATE EXTERNAL TABLE IF NOT EXISTS nytimes_counties (
        report_date STRING, county STRING, state STRING,
        fips_code STRING, cumulative_cases BIGINT, cumulative_deaths BIGINT
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    ...
    LOCATION 's3://<raw>/covid/nytimes/'
    TBLPROPERTIES ('skip.header.line.count'='1')
""")

After table creation the script runs:

  • DataFrame API: spark.table("nytimes_counties").groupBy("state").agg(sum("cumulative_cases")).orderBy(desc(...)).show(10)
  • Spark SQL: top-10 counties by CFR, hospital capacity by state

Key Concepts — SparkSession and enableHiveSupport()

enableHiveSupport() tells Spark to use the Hive metastore client configured in spark-hive-site. On EMR this routes to AWS Glue. Tables created via spark.sql("CREATE TABLE") appear in Glue and are visible to Athena, Hive, and Flink.

Key Concepts — OpenCSVSerde

CSV files with quoted fields require org.apache.hadoop.hive.serde2.OpenCSVSerde rather than the default LazySimpleSerDe. All columns are read as STRING; cast in queries.


7. Lab 02 — Catalog Inspection

Goal: Explore the Glue Data Catalog programmatically and understand query plans.

./labs/lab-02-catalog-inspection/scripts/run_lab.sh

What 01_inspect_catalog.py does

# List all tables in the database
for t in spark.catalog.listTables(db):
    print(t.name, t.tableType, t.isTemporary)

# List columns
for col in spark.catalog.listColumns("nytimes_counties", db):
    print(col.name, col.dataType)

Then runs DESCRIBE FORMATTED nytimes_counties to show Glue metadata, partition info, and SerDe properties.

Predicate pushdown demonstration:

df = spark.table("nytimes_counties").filter("state = 'New York'")
df.explain(True)   # shows pushed-down filter in FileScan plan

Key Concepts — Catalyst Optimizer

Spark’s Catalyst optimizer transforms a logical plan through a series of rules:

  1. Analysis — resolves column names against the catalog
  2. Logical optimisation — constant folding, predicate pushdown, column pruning
  3. Physical planning — selects join strategy (broadcast vs sort-merge), partitioning
  4. Code generation — produces JVM bytecode (Whole-Stage CodeGen)

df.explain(True) shows all four plan layers. Look for PushedFilters in the FileScan physical operator to confirm predicate pushdown is active.

Key Concepts — Dynamic Partition Pruning (DPP)

When spark.sql.adaptive.enabled=true and a table is joined on its partition key, Spark injects a runtime filter that prunes irrelevant partitions at scan time. This is visible in the physical plan as DynamicPruningExpression.


8. Lab 03 — Enrichment Join

Goal: Join NY Times county data with hospital bed capacity, write Parquet and ORC, validate output.

./labs/lab-03-enrichment/scripts/run_lab.sh

Two EMR steps:

  1. 01_ctas_enriched.py — join and write
  2. 02_validate_output.py — read back and validate

What 01_ctas_enriched.py does

nyt  = spark.table("nytimes_counties")
beds = spark.table("hospital_beds")

enriched = nyt.join(beds, nyt.fips_code == beds.county_fips_code, "left") \
    .select(
        nyt.report_date, nyt.state, nyt.county, nyt.fips_code,
        nyt.cumulative_cases, nyt.cumulative_deaths,
        (nyt.cumulative_deaths / nyt.cumulative_cases * 100).alias("case_fatality_rate"),
        beds.num_staffed_beds, beds.num_icu_beds,
        (nyt.cumulative_cases / beds.num_staffed_beds).alias("cases_per_staffed_bed")
    )

# Parquet output — partitioned by state
enriched.write.format("parquet") \
    .option("compression", "snappy") \
    .partitionBy("state") \
    .mode("overwrite") \
    .save(f"s3://{intermediate}/covid/enriched/parquet/")

# ORC output — bloom filter on fips_code for fast point lookups
enriched.write.format("orc") \
    .option("compression", "zlib") \
    .option("orc.bloom.filter.columns", "fips_code") \
    .partitionBy("state") \
    .mode("overwrite") \
    .save(f"s3://{intermediate}/covid/enriched/orc/")

What 02_validate_output.py does

  • Row count, null counts for key columns
  • Top-5 states by total cases
  • Case fatality rate range check (should be 0–100%)

Key Concepts — Parquet vs ORC

ParquetORC
OriginTwitter / ClouderaHive
Column encodingDictionary, RLE, deltaDictionary, RLE, delta, run-length
Bloom filterYes (Parquet 2.0)Yes (orc.bloom.filter.columns)
Predicate pushdownRow-group min/max + bloomStripe + row-group + bloom
Iceberg supportNativeSupported but Parquet preferred
Athena support

Use Parquet as the default. Use ORC when you have legacy Hive workloads or need ORC-specific compression (ZLIB tends to be smaller than Snappy for text-heavy data).

Key Concepts — Partition by State

Writing partitionBy("state") creates one S3 prefix per state:

s3://.../enriched/parquet/state=Alabama/part-00000.snappy.parquet
s3://.../enriched/parquet/state=Alaska/part-00000.snappy.parquet
...

Athena and Spark queries with WHERE state = 'New York' skip all other prefixes entirely (partition pruning). This is the primary cost and performance lever for large datasets.


9. Lab 04 — Apache Iceberg

Goal: Create an Iceberg v2 table in Glue, perform ACID operations, evolve the schema, and read historical snapshots.

./labs/lab-04-iceberg/scripts/run_lab.sh

What 01_iceberg_basics.py does

Create Iceberg table

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS glue_catalog.{db}.enriched_covid_iceberg (
        report_date     STRING,
        state           STRING,
        county          STRING,
        cumulative_cases BIGINT,
        cumulative_deaths BIGINT,
        case_fatality_rate DOUBLE
    )
    USING iceberg
    PARTITIONED BY (state)
    TBLPROPERTIES ('format-version' = '2')
""")

ACID DELETE

spark.sql(f"""
    DELETE FROM glue_catalog.{db}.enriched_covid_iceberg
    WHERE cumulative_cases < 10
""")

Schema evolution — ADD COLUMN

spark.sql(f"""
    ALTER TABLE glue_catalog.{db}.enriched_covid_iceberg
    ADD COLUMN hospital_beds_ratio DOUBLE
""")

UPDATE

spark.sql(f"""
    UPDATE glue_catalog.{db}.enriched_covid_iceberg
    SET hospital_beds_ratio = cumulative_cases / 100.0
    WHERE state = 'New York'
""")

Time-travel

ts = spark.sql(f"""
    SELECT committed_at FROM glue_catalog.{db}.enriched_covid_iceberg.snapshots
    ORDER BY committed_at LIMIT 1
""").collect()[0][0]

spark.read.option("as-of-timestamp", str(ts)) \
    .table(f"glue_catalog.{db}.enriched_covid_iceberg") \
    .show()

Expire old snapshots

spark.sql(f"""
    CALL glue_catalog.system.expire_snapshots(
        table => '{db}.enriched_covid_iceberg',
        older_than => TIMESTAMP '{cutoff}',
        retain_last => 2
    )
""")

Key Concepts — Apache Iceberg

Iceberg v2 adds row-level deletes on top of v1’s snapshot isolation:

FeatureIceberg v1Iceberg v2
Snapshot isolation
Schema evolution
Partition evolution
Row-level DELETE✓ (merge-on-read)
Row-level UPDATE

Snapshot model: every write creates a new snapshot. The metadata file (metadata.json) points to the current snapshot. Old data files are kept until expire_snapshots removes them. Time-travel reads any prior snapshot.

Glue catalog integration: glue_catalog is a Spark catalog alias backed by GlueCatalog. Iceberg stores its metadata in Glue as custom table properties pointing to the S3 metadata location.

Key Concepts — Adaptive Query Execution (AQE)

AQE (spark.sql.adaptive.enabled=true) re-optimises the query plan at runtime using statistics collected during execution:

  • Skew join optimisation: splits oversized partitions into smaller tasks
  • Coalesce post-shuffle partitions: reduces small-file output by merging shuffle outputs
  • Dynamic partition pruning: injects runtime filters for broadcast joins

AQE is safe to enable for all workloads; it only activates when statistics are available.


10. Lab 05 — Athena Queries

Goal: Run Athena SQL against Spark-written Parquet data registered in Glue.

./labs/lab-05-athena/scripts/run_lab.sh

No EMR cluster needed — Athena is serverless.

SQL files

01_create_view.sql — creates v_enriched_summary view in Glue:

CREATE OR REPLACE VIEW v_enriched_summary AS
SELECT state, county, report_date,
       cumulative_cases, cumulative_deaths,
       ROUND(case_fatality_rate, 4) AS cfr
FROM enriched_covid_parquet
WHERE cumulative_cases > 0;

02_top_states.sql — top 10 states by total cases + CFR:

SELECT state,
       MAX(cumulative_cases)  AS total_cases,
       MAX(cumulative_deaths) AS total_deaths,
       ROUND(AVG(case_fatality_rate), 4) AS avg_cfr
FROM v_enriched_summary
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;

03_county_trend.sql — weekly trend for a specific county:

SELECT report_date, county, state,
       cumulative_cases,
       cumulative_cases - LAG(cumulative_cases, 7) OVER (
           PARTITION BY county ORDER BY report_date
       ) AS weekly_new_cases
FROM v_enriched_summary
WHERE state = 'New York'
ORDER BY county, report_date;

Key Concepts — Athena + Glue Integration

Athena is a serverless Presto-based query engine. It reads directly from S3 using metadata stored in Glue. No data movement required. Tables registered by Spark (with enableHiveSupport()) are immediately visible to Athena.

Cost: $5 per TB scanned. Partition pruning and columnar formats (Parquet, ORC) dramatically reduce bytes scanned.

Workgroup: controls output location, per-query data limits, and cost controls. Default workgroup (primary) writes results to s3://<logs-bucket>/athena-results/.


11. Lab 06 — Oozie Scheduling

Goal: Schedule the Spark pipeline as a daily Oozie coordinator job.

# Step 1: launch persistent cluster
./labs/lab-06-oozie/scripts/run_lab.sh

# Step 2: submit coordinator
./labs/lab-06-oozie/scripts/submit_oozie.sh <CLUSTER_ID>

# After the lab: terminate the cluster
aws emr terminate-clusters --cluster-ids <CLUSTER_ID>

Oozie Workflow

The workflow has three <spark> actions in sequence:

<action name="register-tables">
  <spark xmlns="uri:oozie:spark-action:0.2">
    <master>yarn</master>
    <mode>cluster</mode>
    <name>Lab06-RegisterTables</name>
    <class>org.apache.spark.deploy.SparkSubmit</class>
    <jar>${scriptsS3}/01_register_tables.py</jar>
    <spark-opts>--conf spark.yarn.appMasterEnv.RAW_BUCKET=${rawBucket} ...</spark-opts>
  </spark>
  <ok to="enrich"/>
  <error to="kill"/>
</action>

Coordinator

Runs daily at 06:00 UTC:

<coordinator-app name="covid-spark-daily"
  frequency="${coord:days(1)}"
  start="${startTime}" end="${endTime}" timezone="UTC">

Key Concepts — Oozie Spark Action

The <spark> action (namespace uri:oozie:spark-action:0.2) submits a Spark job via YARN directly from within Oozie. It avoids the overhead of a shell script and properly captures stdout/stderr into the Oozie console.

Environment variables are passed via spark-opts: --conf spark.yarn.appMasterEnv.MY_VAR=value makes MY_VAR available as os.environ["MY_VAR"] in the Python script running on YARN.


12. Key Concepts

PySpark DataFrame API vs Spark SQL

Both compile to the same Catalyst logical plan. Choose based on ergonomics:

# DataFrame API — good for chained transformations
df = spark.table("nytimes_counties") \
    .filter(col("state") == "New York") \
    .groupBy("county") \
    .agg(max("cumulative_cases").alias("peak_cases")) \
    .orderBy(desc("peak_cases"))

# Spark SQL — good for complex joins and window functions
spark.sql("""
    SELECT county, MAX(cumulative_cases) AS peak_cases
    FROM nytimes_counties
    WHERE state = 'New York'
    GROUP BY county
    ORDER BY peak_cases DESC
""")

Mixed usage is fine — spark.sql(...) returns a DataFrame and vice versa.

Spark Driver vs Executor Memory

In spark-glue-config.json:

"spark.executor.memory": "3g",
"spark.driver.memory": "3g"

On m5.xlarge (4 vCPU, 16 GB RAM): 3g executor + 1g overhead = 4g per executor. With 2 core nodes you get 2 executors × 3g. Dynamic allocation can add more if available.

Small Files Problem

partitionBy("state") writes one file per Spark partition per state. With 50 states and 200 partitions you get up to 10,000 files. Mitigate with:

df.repartition(10, "state").write.partitionBy("state")...

Or use coalesce() for smaller datasets:

df.coalesce(1).write.partitionBy("state")...

13. Troubleshooting

Cluster stuck in WAITING

Check EMR step logs in s3://<logs-bucket>/emr-logs/<cluster-id>/steps/. Common causes:

  • SparkSubmit command not found: verify --release-label emr-7.2.0 has Spark installed
  • PySpark script has an import error: check stdout log in S3

Iceberg: Table not found

The glue_catalog alias is defined in spark-glue-config.json via spark.sql.catalog.glue_catalog. If you get “catalog not found”, check:

aws emr describe-cluster --cluster-id <ID> --query 'Cluster.Configurations'

Confirm the spark-defaults classification is present.

Athena: HIVE_PARTITION_SCHEMA_MISMATCH

This happens if the Parquet schema doesn’t match Glue table schema. Re-register the table with MSCK REPAIR TABLE enriched_covid_parquet or drop and recreate it.

Oozie: Action FAILED: spark action error

View the action log in the Oozie UI (http://<master>:11000/oozie) or:

oozie job -oozie http://localhost:11000/oozie -log <JOB_ID>

OOM errors

If you see java.lang.OutOfMemoryError: GC overhead limit exceeded:

  1. Increase spark.executor.memory in spark-glue-config.json or pass --conf at runtime
  2. Use df.persist(StorageLevel.DISK_ONLY) for large DataFrames re-used multiple times
  3. Check for data skew with df.groupBy("state").count().orderBy(desc("count")).show()