EMR Spark Labs — COVID-19 Analytics with PySpark + Iceberg
Seven hands-on labs that build a full COVID-19 analytics pipeline using Apache Spark on Amazon EMR 7.2.0. You will use the PySpark DataFrame API and Spark SQL, ingest raw CSV from the AWS COVID-19 Data Lake, enrich it with hospital capacity data, write optimised Parquet/ORC output, manage an Apache Iceberg ACID table, query via Athena, and schedule the pipeline with Oozie.
Table of Contents
- Architecture
- Prerequisites
- Repository Layout
- EMR Configuration
- Lab 00 — Data Foundation
- Lab 01 — Register Tables
- Lab 02 — Catalog Inspection
- Lab 03 — Enrichment Join
- Lab 04 — Apache Iceberg
- Lab 05 — Athena Queries
- Lab 06 — Oozie Scheduling
- Key Concepts
- Troubleshooting
1. Architecture
s3://covid19-lake (public)
│
│ lab-00: sync CSV
▼
s3://emr-labs-raw/covid/
│
│ lab-01: SparkSession + Glue catalog (external tables)
▼
AWS Glue Data Catalog (emr_labs_db)
├─ nytimes_counties ← CSV external table
└─ hospital_beds ← CSV external table
│
│ lab-03: DataFrame enrichment join
▼
s3://emr-labs-intermediate/covid/enriched/
├─ parquet/ (snappy, partitioned by state)
└─ orc/ (zlib, bloom filter on fips_code)
│
│ lab-04: Iceberg ACID
▼
s3://emr-labs-intermediate/covid/iceberg/
└─ enriched_covid_iceberg/ (Iceberg v2 table)
│
lab-05: Athena lab-06: Oozie
2. Prerequisites
| Requirement | Notes |
|---|---|
| Terraform applied | terraform/envs/dev/ provisioned |
| AWS CLI ≥ 2.15 | Profile with EMR + S3 + Glue + Athena permissions |
| Bash ≥ 4.0 | brew install bash on macOS |
| jq | For get_tf_outputs.sh |
KEY_PAIR env var (optional) | EC2 key pair name for SSH access to master node |
Export region once:
export AWS_DEFAULT_REGION=us-east-1
3. Repository Layout
emr-spark-labs/
├── README.md
├── shared/
│ └── configs/
│ └── spark-glue-config.json ← Spark + Glue EMR classifications
└── labs/
├── lab-00-foundation/
│ └── scripts/copy_public_data.sh
├── lab-01-register-tables/
│ ├── pyspark/01_register_tables.py
│ └── scripts/run_lab.sh
├── lab-02-catalog-inspection/
│ ├── pyspark/01_inspect_catalog.py
│ └── scripts/run_lab.sh
├── lab-03-enrichment/
│ ├── pyspark/
│ │ ├── 01_ctas_enriched.py
│ │ └── 02_validate_output.py
│ └── scripts/run_lab.sh
├── lab-04-iceberg/
│ ├── pyspark/01_iceberg_basics.py
│ └── scripts/run_lab.sh
├── lab-05-athena/
│ ├── sql/
│ │ ├── 01_create_view.sql
│ │ ├── 02_top_states.sql
│ │ └── 03_county_trend.sql
│ └── scripts/run_lab.sh
└── lab-06-oozie/
├── oozie/
│ ├── workflow.xml
│ ├── coordinator.xml
│ └── job.properties
└── scripts/
├── run_lab.sh
└── submit_oozie.sh
4. EMR Configuration
shared/configs/spark-glue-config.json
Applied to every cluster via --configurations file://....
Key settings:
| Classification | Property | Value | Purpose |
|---|---|---|---|
spark-defaults | spark.sql.extensions | IcebergSparkSessionExtensions | Enable Iceberg DDL/DML |
spark-defaults | spark.sql.catalog.glue_catalog | org.apache.iceberg.spark.SparkCatalog | Iceberg catalog alias |
spark-defaults | spark.sql.catalog.glue_catalog.catalog-impl | org.apache.iceberg.aws.glue.GlueCatalog | Glue as Iceberg metastore |
spark-defaults | spark.sql.catalogImplementation | hive | Enables Hive catalog for external tables |
spark-defaults | spark.hadoop.hive.metastore.client.factory.class | com.amazonaws.glue... | AWS Glue metastore factory |
spark-defaults | spark.dynamicAllocation.enabled | true | Auto-scale executors 1–4 |
spark-defaults | spark.sql.adaptive.enabled | true | Adaptive query execution |
spark-hive-site | hive.metastore.client.factory.class | same factory | Hive-compatible access |
5. Lab 00 — Data Foundation
Goal: Copy COVID-19 CSV data from the public AWS Data Lake to your private raw S3 bucket.
./labs/lab-00-foundation/scripts/copy_public_data.sh
What it does:
- Sources
get_tf_outputs.shto get$RAW_BUCKET aws s3 sync s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-counties/ s3://$RAW_BUCKET/covid/nytimes/aws s3 sync s3://covid19-lake/rearc-usa-hospital-beds/csv/ s3://$RAW_BUCKET/covid/hospital-beds/
Output layout:
s3://<raw-bucket>/covid/
├── nytimes/ ← ~3MB CSV, ~400k rows
└── hospital-beds/ ← ~1MB CSV, ~7000 rows
Run time: ~30 seconds (no EMR cluster needed).
6. Lab 01 — Register Tables
Goal: Create Glue-backed external tables and run PySpark analytics.
./labs/lab-01-register-tables/scripts/run_lab.sh
Cluster config: Spark + Hadoop + Hive, emr-7.2.0, m5.xlarge fleet.
What 01_register_tables.py does
spark = SparkSession.builder \
.appName("Lab01-RegisterTables") \
.enableHiveSupport() \
.getOrCreate()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS nytimes_counties (
report_date STRING, county STRING, state STRING,
fips_code STRING, cumulative_cases BIGINT, cumulative_deaths BIGINT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
...
LOCATION 's3://<raw>/covid/nytimes/'
TBLPROPERTIES ('skip.header.line.count'='1')
""")
After table creation the script runs:
- DataFrame API:
spark.table("nytimes_counties").groupBy("state").agg(sum("cumulative_cases")).orderBy(desc(...)).show(10) - Spark SQL: top-10 counties by CFR, hospital capacity by state
Key Concepts — SparkSession and enableHiveSupport()
enableHiveSupport() tells Spark to use the Hive metastore client configured in spark-hive-site. On EMR this routes to AWS Glue. Tables created via spark.sql("CREATE TABLE") appear in Glue and are visible to Athena, Hive, and Flink.
Key Concepts — OpenCSVSerde
CSV files with quoted fields require org.apache.hadoop.hive.serde2.OpenCSVSerde rather than the default LazySimpleSerDe. All columns are read as STRING; cast in queries.
7. Lab 02 — Catalog Inspection
Goal: Explore the Glue Data Catalog programmatically and understand query plans.
./labs/lab-02-catalog-inspection/scripts/run_lab.sh
What 01_inspect_catalog.py does
# List all tables in the database
for t in spark.catalog.listTables(db):
print(t.name, t.tableType, t.isTemporary)
# List columns
for col in spark.catalog.listColumns("nytimes_counties", db):
print(col.name, col.dataType)
Then runs DESCRIBE FORMATTED nytimes_counties to show Glue metadata, partition info, and SerDe properties.
Predicate pushdown demonstration:
df = spark.table("nytimes_counties").filter("state = 'New York'")
df.explain(True) # shows pushed-down filter in FileScan plan
Key Concepts — Catalyst Optimizer
Spark’s Catalyst optimizer transforms a logical plan through a series of rules:
- Analysis — resolves column names against the catalog
- Logical optimisation — constant folding, predicate pushdown, column pruning
- Physical planning — selects join strategy (broadcast vs sort-merge), partitioning
- Code generation — produces JVM bytecode (Whole-Stage CodeGen)
df.explain(True) shows all four plan layers. Look for PushedFilters in the FileScan physical operator to confirm predicate pushdown is active.
Key Concepts — Dynamic Partition Pruning (DPP)
When spark.sql.adaptive.enabled=true and a table is joined on its partition key, Spark injects a runtime filter that prunes irrelevant partitions at scan time. This is visible in the physical plan as DynamicPruningExpression.
8. Lab 03 — Enrichment Join
Goal: Join NY Times county data with hospital bed capacity, write Parquet and ORC, validate output.
./labs/lab-03-enrichment/scripts/run_lab.sh
Two EMR steps:
01_ctas_enriched.py— join and write02_validate_output.py— read back and validate
What 01_ctas_enriched.py does
nyt = spark.table("nytimes_counties")
beds = spark.table("hospital_beds")
enriched = nyt.join(beds, nyt.fips_code == beds.county_fips_code, "left") \
.select(
nyt.report_date, nyt.state, nyt.county, nyt.fips_code,
nyt.cumulative_cases, nyt.cumulative_deaths,
(nyt.cumulative_deaths / nyt.cumulative_cases * 100).alias("case_fatality_rate"),
beds.num_staffed_beds, beds.num_icu_beds,
(nyt.cumulative_cases / beds.num_staffed_beds).alias("cases_per_staffed_bed")
)
# Parquet output — partitioned by state
enriched.write.format("parquet") \
.option("compression", "snappy") \
.partitionBy("state") \
.mode("overwrite") \
.save(f"s3://{intermediate}/covid/enriched/parquet/")
# ORC output — bloom filter on fips_code for fast point lookups
enriched.write.format("orc") \
.option("compression", "zlib") \
.option("orc.bloom.filter.columns", "fips_code") \
.partitionBy("state") \
.mode("overwrite") \
.save(f"s3://{intermediate}/covid/enriched/orc/")
What 02_validate_output.py does
- Row count, null counts for key columns
- Top-5 states by total cases
- Case fatality rate range check (should be 0–100%)
Key Concepts — Parquet vs ORC
| Parquet | ORC | |
|---|---|---|
| Origin | Twitter / Cloudera | Hive |
| Column encoding | Dictionary, RLE, delta | Dictionary, RLE, delta, run-length |
| Bloom filter | Yes (Parquet 2.0) | Yes (orc.bloom.filter.columns) |
| Predicate pushdown | Row-group min/max + bloom | Stripe + row-group + bloom |
| Iceberg support | Native | Supported but Parquet preferred |
| Athena support | ✓ | ✓ |
Use Parquet as the default. Use ORC when you have legacy Hive workloads or need ORC-specific compression (ZLIB tends to be smaller than Snappy for text-heavy data).
Key Concepts — Partition by State
Writing partitionBy("state") creates one S3 prefix per state:
s3://.../enriched/parquet/state=Alabama/part-00000.snappy.parquet
s3://.../enriched/parquet/state=Alaska/part-00000.snappy.parquet
...
Athena and Spark queries with WHERE state = 'New York' skip all other prefixes entirely (partition pruning). This is the primary cost and performance lever for large datasets.
9. Lab 04 — Apache Iceberg
Goal: Create an Iceberg v2 table in Glue, perform ACID operations, evolve the schema, and read historical snapshots.
./labs/lab-04-iceberg/scripts/run_lab.sh
What 01_iceberg_basics.py does
Create Iceberg table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS glue_catalog.{db}.enriched_covid_iceberg (
report_date STRING,
state STRING,
county STRING,
cumulative_cases BIGINT,
cumulative_deaths BIGINT,
case_fatality_rate DOUBLE
)
USING iceberg
PARTITIONED BY (state)
TBLPROPERTIES ('format-version' = '2')
""")
ACID DELETE
spark.sql(f"""
DELETE FROM glue_catalog.{db}.enriched_covid_iceberg
WHERE cumulative_cases < 10
""")
Schema evolution — ADD COLUMN
spark.sql(f"""
ALTER TABLE glue_catalog.{db}.enriched_covid_iceberg
ADD COLUMN hospital_beds_ratio DOUBLE
""")
UPDATE
spark.sql(f"""
UPDATE glue_catalog.{db}.enriched_covid_iceberg
SET hospital_beds_ratio = cumulative_cases / 100.0
WHERE state = 'New York'
""")
Time-travel
ts = spark.sql(f"""
SELECT committed_at FROM glue_catalog.{db}.enriched_covid_iceberg.snapshots
ORDER BY committed_at LIMIT 1
""").collect()[0][0]
spark.read.option("as-of-timestamp", str(ts)) \
.table(f"glue_catalog.{db}.enriched_covid_iceberg") \
.show()
Expire old snapshots
spark.sql(f"""
CALL glue_catalog.system.expire_snapshots(
table => '{db}.enriched_covid_iceberg',
older_than => TIMESTAMP '{cutoff}',
retain_last => 2
)
""")
Key Concepts — Apache Iceberg
Iceberg v2 adds row-level deletes on top of v1’s snapshot isolation:
| Feature | Iceberg v1 | Iceberg v2 |
|---|---|---|
| Snapshot isolation | ✓ | ✓ |
| Schema evolution | ✓ | ✓ |
| Partition evolution | ✓ | ✓ |
| Row-level DELETE | ✗ | ✓ (merge-on-read) |
| Row-level UPDATE | ✗ | ✓ |
Snapshot model: every write creates a new snapshot. The metadata file (metadata.json) points to the current snapshot. Old data files are kept until expire_snapshots removes them. Time-travel reads any prior snapshot.
Glue catalog integration: glue_catalog is a Spark catalog alias backed by GlueCatalog. Iceberg stores its metadata in Glue as custom table properties pointing to the S3 metadata location.
Key Concepts — Adaptive Query Execution (AQE)
AQE (spark.sql.adaptive.enabled=true) re-optimises the query plan at runtime using statistics collected during execution:
- Skew join optimisation: splits oversized partitions into smaller tasks
- Coalesce post-shuffle partitions: reduces small-file output by merging shuffle outputs
- Dynamic partition pruning: injects runtime filters for broadcast joins
AQE is safe to enable for all workloads; it only activates when statistics are available.
10. Lab 05 — Athena Queries
Goal: Run Athena SQL against Spark-written Parquet data registered in Glue.
./labs/lab-05-athena/scripts/run_lab.sh
No EMR cluster needed — Athena is serverless.
SQL files
01_create_view.sql — creates v_enriched_summary view in Glue:
CREATE OR REPLACE VIEW v_enriched_summary AS
SELECT state, county, report_date,
cumulative_cases, cumulative_deaths,
ROUND(case_fatality_rate, 4) AS cfr
FROM enriched_covid_parquet
WHERE cumulative_cases > 0;
02_top_states.sql — top 10 states by total cases + CFR:
SELECT state,
MAX(cumulative_cases) AS total_cases,
MAX(cumulative_deaths) AS total_deaths,
ROUND(AVG(case_fatality_rate), 4) AS avg_cfr
FROM v_enriched_summary
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;
03_county_trend.sql — weekly trend for a specific county:
SELECT report_date, county, state,
cumulative_cases,
cumulative_cases - LAG(cumulative_cases, 7) OVER (
PARTITION BY county ORDER BY report_date
) AS weekly_new_cases
FROM v_enriched_summary
WHERE state = 'New York'
ORDER BY county, report_date;
Key Concepts — Athena + Glue Integration
Athena is a serverless Presto-based query engine. It reads directly from S3 using metadata stored in Glue. No data movement required. Tables registered by Spark (with enableHiveSupport()) are immediately visible to Athena.
Cost: $5 per TB scanned. Partition pruning and columnar formats (Parquet, ORC) dramatically reduce bytes scanned.
Workgroup: controls output location, per-query data limits, and cost controls. Default workgroup (primary) writes results to s3://<logs-bucket>/athena-results/.
11. Lab 06 — Oozie Scheduling
Goal: Schedule the Spark pipeline as a daily Oozie coordinator job.
# Step 1: launch persistent cluster
./labs/lab-06-oozie/scripts/run_lab.sh
# Step 2: submit coordinator
./labs/lab-06-oozie/scripts/submit_oozie.sh <CLUSTER_ID>
# After the lab: terminate the cluster
aws emr terminate-clusters --cluster-ids <CLUSTER_ID>
Oozie Workflow
The workflow has three <spark> actions in sequence:
<action name="register-tables">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Lab06-RegisterTables</name>
<class>org.apache.spark.deploy.SparkSubmit</class>
<jar>${scriptsS3}/01_register_tables.py</jar>
<spark-opts>--conf spark.yarn.appMasterEnv.RAW_BUCKET=${rawBucket} ...</spark-opts>
</spark>
<ok to="enrich"/>
<error to="kill"/>
</action>
Coordinator
Runs daily at 06:00 UTC:
<coordinator-app name="covid-spark-daily"
frequency="${coord:days(1)}"
start="${startTime}" end="${endTime}" timezone="UTC">
Key Concepts — Oozie Spark Action
The <spark> action (namespace uri:oozie:spark-action:0.2) submits a Spark job via YARN directly from within Oozie. It avoids the overhead of a shell script and properly captures stdout/stderr into the Oozie console.
Environment variables are passed via spark-opts: --conf spark.yarn.appMasterEnv.MY_VAR=value makes MY_VAR available as os.environ["MY_VAR"] in the Python script running on YARN.
12. Key Concepts
PySpark DataFrame API vs Spark SQL
Both compile to the same Catalyst logical plan. Choose based on ergonomics:
# DataFrame API — good for chained transformations
df = spark.table("nytimes_counties") \
.filter(col("state") == "New York") \
.groupBy("county") \
.agg(max("cumulative_cases").alias("peak_cases")) \
.orderBy(desc("peak_cases"))
# Spark SQL — good for complex joins and window functions
spark.sql("""
SELECT county, MAX(cumulative_cases) AS peak_cases
FROM nytimes_counties
WHERE state = 'New York'
GROUP BY county
ORDER BY peak_cases DESC
""")
Mixed usage is fine — spark.sql(...) returns a DataFrame and vice versa.
Spark Driver vs Executor Memory
In spark-glue-config.json:
"spark.executor.memory": "3g",
"spark.driver.memory": "3g"
On m5.xlarge (4 vCPU, 16 GB RAM): 3g executor + 1g overhead = 4g per executor. With 2 core nodes you get 2 executors × 3g. Dynamic allocation can add more if available.
Small Files Problem
partitionBy("state") writes one file per Spark partition per state. With 50 states and 200 partitions you get up to 10,000 files. Mitigate with:
df.repartition(10, "state").write.partitionBy("state")...
Or use coalesce() for smaller datasets:
df.coalesce(1).write.partitionBy("state")...
13. Troubleshooting
Cluster stuck in WAITING
Check EMR step logs in s3://<logs-bucket>/emr-logs/<cluster-id>/steps/. Common causes:
- SparkSubmit command not found: verify
--release-label emr-7.2.0has Spark installed - PySpark script has an import error: check stdout log in S3
Iceberg: Table not found
The glue_catalog alias is defined in spark-glue-config.json via spark.sql.catalog.glue_catalog. If you get “catalog not found”, check:
aws emr describe-cluster --cluster-id <ID> --query 'Cluster.Configurations'
Confirm the spark-defaults classification is present.
Athena: HIVE_PARTITION_SCHEMA_MISMATCH
This happens if the Parquet schema doesn’t match Glue table schema. Re-register the table with MSCK REPAIR TABLE enriched_covid_parquet or drop and recreate it.
Oozie: Action FAILED: spark action error
View the action log in the Oozie UI (http://<master>:11000/oozie) or:
oozie job -oozie http://localhost:11000/oozie -log <JOB_ID>
OOM errors
If you see java.lang.OutOfMemoryError: GC overhead limit exceeded:
- Increase
spark.executor.memoryinspark-glue-config.jsonor pass--confat runtime - Use
df.persist(StorageLevel.DISK_ONLY)for large DataFrames re-used multiple times - Check for data skew with
df.groupBy("state").count().orderBy(desc("count")).show()