EMR Labs — Three-Engine COVID-19 Data Pipeline
An end-to-end educational lab system that builds the same COVID-19 analytics pipeline three different ways on Amazon EMR 7.2.0: with Apache Hive + Tez, Apache Spark, and Apache Flink. Each flavour shares the same AWS infrastructure but demonstrates the distinct programming model, strengths, and operational patterns of its engine.
Table of Contents
- Repository Structure
- Shared Infrastructure
- Lab Flavours Overview
- Engine Comparison
- Getting Started
- Prerequisites
- Data Sources
- Shared Scripts Reference
1. Repository Structure
emr-labs/
├── README.md ← this file
├── terraform/
│ ├── envs/dev/
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ ├── variables.tf
│ │ ├── versions.tf
│ │ └── terraform.tfvars
│ └── modules/
│ ├── s3/
│ └── iam/
├── shared/
│ ├── configs/
│ │ └── emr-instance-fleet.json ← EC2 instance fleet spec (all flavours)
│ └── scripts/
│ ├── get_tf_outputs.sh ← exports Terraform outputs as env vars
│ └── wait_for_step.sh ← polls EMR step status
├── emr-hive-tez-labs/ ← Hive + Tez flavour
├── emr-spark-labs/ ← Apache Spark (PySpark) flavour
└── emr-flink-labs/ ← Apache Flink (SQL + PyFlink) flavour
Each flavour follows the same internal layout:
emr-<engine>-labs/
├── README.md
├── shared/configs/<engine>-config.json ← EMR application configurations
└── labs/
├── lab-00-foundation/ ← data ingestion from public S3
├── lab-01-* ← create external tables / register sources
├── lab-02-* ← catalog inspection
├── lab-03-* ← enrichment join + Parquet output
├── lab-04-* ← engine-specific advanced feature
├── lab-05-athena/ ← Athena federated queries (all 3 flavours)
└── lab-06-oozie-*/ ← Oozie scheduling
2. Shared Infrastructure
2.1 Terraform
All three flavours share a single Terraform deployment in terraform/envs/dev/.
cd terraform/envs/dev
terraform init
terraform apply -var-file="terraform.tfvars"
Terraform creates:
| Resource | Name pattern | Purpose |
|---|---|---|
| S3 bucket | emr-labs-raw-* | Raw CSV data copied from public sources |
| S3 bucket | emr-labs-intermediate-* | Processed Parquet / ORC output |
| S3 bucket | emr-labs-logs-* | EMR logs, Athena results, staging |
| Glue database | emr_labs_db | Shared Hive-compatible metastore |
| IAM role | EMR_DefaultRole | EMR service role |
| IAM role | EMR_EC2_DefaultRole | EC2 instance profile |
2.2 EC2 Instance Fleet
All clusters use shared/configs/emr-instance-fleet.json:
- Master: 1 × m5.xlarge On-Demand
- Core: 2 × m5.xlarge Spot,
capacity-optimizedstrategy, 10-minute provisioning timeout
2.3 get_tf_outputs.sh
Sourced by every lab’s run_lab.sh. Exports:
RAW_BUCKET INTERMEDIATE_BUCKET LOGS_BUCKET
GLUE_DB EMR_SERVICE_ROLE EMR_INSTANCE_PROFILE
source ../../shared/scripts/get_tf_outputs.sh
echo "$RAW_BUCKET"
2.4 wait_for_step.sh
Polls all EMR steps on a cluster until terminal state.
./shared/scripts/wait_for_step.sh <CLUSTER_ID> [<REGION>]
3. Lab Flavours Overview
emr-hive-tez-labs — Apache Hive + Tez
Best for: SQL-native teams, existing Hive workloads, simple batch ETL, HiveQL familiarity.
| Lab | Focus |
|---|---|
| lab-00 | Copy public COVID CSV from s3://covid19-lake to raw bucket |
| lab-01 | External tables with OpenCSVSerde; run HiveQL analytics |
| lab-02 | Catalog inspection (SHOW TABLES, DESCRIBE, MSCK REPAIR) |
| lab-03 | CTAS enrichment join → Parquet + ORC output |
| lab-04 | ACID transactions, INSERT/UPDATE/DELETE, compaction, time-travel |
| lab-05 | Athena federated queries against Glue catalog |
| lab-06 | Oozie coordinator with <hive> action, daily scheduling |
emr-spark-labs — Apache Spark (PySpark)
Best for: Large-scale ML pipelines, DataFrame/Dataset API, Iceberg ACID tables, Catalyst optimizer, complex transformations.
| Lab | Focus |
|---|---|
| lab-00 | Copy public data (same shell script pattern as Hive) |
| lab-01 | SparkSession + enableHiveSupport(), DataFrame and Spark SQL analytics |
| lab-02 | Catalog API: spark.catalog.listTables, DESCRIBE FORMATTED, explain() plans |
| lab-03 | DataFrame join → Parquet (snappy) + ORC (zlib, bloom filter), validation |
| lab-04 | Apache Iceberg: ACID DELETE/UPDATE, schema evolution, TIMESTAMP AS OF time-travel, snapshots |
| lab-05 | Athena queries against Spark-written Parquet |
| lab-06 | Oozie <spark> action (oozie:spark-action:0.2), coordinator |
emr-flink-labs — Apache Flink (SQL + PyFlink)
Best for: Stream processing, event-time windowing, exactly-once semantics, low-latency incremental pipelines.
| Lab | Focus |
|---|---|
| lab-00 | Copy public data |
| lab-01 | Flink SQL: CREATE CATALOG (Glue/Hive), CREATE TABLE with connector=filesystem, tableau results |
| lab-02 | Catalog inspection in Flink SQL (SHOW CATALOGS, DESCRIBE, SHOW CREATE TABLE) |
| lab-03 | Enrichment join in Flink SQL → Parquet sink, validate output |
| lab-04 | Windowing: TUMBLE (7-day), HOP (30d/7d slide), SESSION (14d gap), PyFlink DataStream API |
| lab-05 | Athena queries against Flink-written Parquet |
| lab-06 | Oozie with <shell> action wrapping Flink SQL Client, coordinator |
4. Engine Comparison
| Dimension | Hive + Tez | Spark | Flink |
|---|---|---|---|
| Paradigm | SQL-first batch | Micro-batch / batch | True stream + batch |
| Language | HiveQL | PySpark / Scala | Flink SQL / Java / Python |
| Latency | Minutes | Seconds–minutes | Milliseconds–seconds |
| State management | None | In-memory RDD | RocksDB, exactly-once |
| ACID | Native (ORC ACID) | Iceberg / Delta | Iceberg (via connector) |
| Windowing | None (GROUP BY DATE) | Window functions (SQL) | Tumbling / Sliding / Session TVF |
| Catalog | Glue AWSGlueDataCatalogHiveClientFactory | Same factory via spark-hive-site | Glue via Flink Hive connector |
| Oozie action | <hive> | <spark> | <shell> (no native Flink action) |
| Best for | Legacy SQL, simple ETL | Complex transforms, ML, large joins | Real-time, event-time, stateful |
| EMR release | emr-7.2.0 | emr-7.2.0 | emr-7.2.0 |
When to choose each engine
Choose Hive + Tez when:
- Your team writes SQL natively and wants no JVM/Python overhead
- You have an existing HiveQL workload to migrate
- The pipeline is simple SELECT / INSERT OVERWRITE / CTAS with daily cadence
- You need native ORC ACID without a separate table format dependency
Choose Spark when:
- You need Python or Scala beyond SQL
- Your pipeline involves ML (MLlib, PyTorch via SparkTorch) or graph processing
- You want Apache Iceberg for cross-engine ACID and schema evolution
- Data volume is very large and you need Catalyst’s advanced query optimisation (AQE, DPP)
- You need DataFrame-level transformations that are complex to express in SQL
Choose Flink when:
- Latency requirements are sub-minute (streaming ingestion, real-time dashboards)
- Event-time ordering and watermarks are essential to correctness
- You need stateful operations (CEP, exactly-once aggregation, session detection)
- The pipeline is incremental: process new records as they arrive, not full daily reloads
- You want to run the same code in both batch (backfill) and streaming (live) modes
5. Getting Started
Quick Start (all three flavours)
# 1. Provision shared infrastructure
cd terraform/envs/dev
terraform init && terraform apply -auto-approve
# 2. Copy public data to your S3 raw bucket (run once)
./emr-hive-tez-labs/labs/lab-00-foundation/scripts/copy_public_data.sh
# (or emr-spark-labs / emr-flink-labs — all do the same copy)
# 3. Run any flavour's lab series
cd emr-spark-labs/labs/lab-01-register-tables/scripts
./run_lab.sh
Setting Optional Environment Variables
export AWS_DEFAULT_REGION=us-east-1 # default us-east-1
export KEY_PAIR=my-ec2-keypair # adds KeyName to EC2 attributes (for SSH)
export ATHENA_WORKGROUP=primary # default primary (lab-05 only)
Lab Execution Order
Run labs in numeric order within a flavour. Each lab builds on the previous:
lab-00 → lab-01 → lab-02 → lab-03 → lab-04 → lab-05 → lab-06
(data) (tables) (catalog) (join) (advanced)(athena) (schedule)
Lab-05 and lab-06 of all three flavours can be run independently after lab-03 is complete, because Athena reads the Parquet written by lab-03 and Oozie simply schedules the same pipeline.
6. Prerequisites
| Requirement | Version | Notes |
|---|---|---|
| AWS CLI | ≥ 2.15 | Configured with a profile that has EMR, S3, Glue, Athena IAM permissions |
| Terraform | ≥ 1.6 | HashiCorp OSS or Terraform Cloud |
| Bash | ≥ 4.0 | macOS ships with 3.x; install with brew install bash |
| jq | any | Used by get_tf_outputs.sh to parse Terraform output |
IAM permissions required (attach to the user/role that runs these scripts):
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:*",
"s3:*",
"glue:*",
"athena:*",
"iam:PassRole"
],
"Resource": "*"
}
For production, scope
Resourceto specific ARNs.
7. Data Sources
All labs use publicly available COVID-19 data hosted on the AWS COVID-19 Data Lake (s3://covid19-lake, us-east-1).
NY Times US Counties
s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-counties/
Schema:
| Column | Type | Description |
|---|---|---|
date | STRING | Report date (YYYY-MM-DD) |
county | STRING | County name |
state | STRING | US state name |
fips | STRING | 5-digit FIPS code |
cases | BIGINT | Cumulative confirmed cases |
deaths | BIGINT | Cumulative deaths |
Definitive Healthcare Hospital Beds
s3://covid19-lake/rearc-usa-hospital-beds/csv/
Schema (selected columns):
| Column | Type | Description |
|---|---|---|
county_fips_code | STRING | 5-digit FIPS (join key) |
county_name | STRING | County name |
state_name | STRING | State |
num_staffed_beds | DOUBLE | Licensed staffed beds |
num_icu_beds | DOUBLE | ICU beds |
num_licensed_beds | DOUBLE | Total licensed beds |
Join key: nytimes_counties.fips = hospital_beds.county_fips_code
8. Shared Scripts Reference
shared/scripts/get_tf_outputs.sh
source shared/scripts/get_tf_outputs.sh
# Exports: RAW_BUCKET, INTERMEDIATE_BUCKET, LOGS_BUCKET,
# GLUE_DB, EMR_SERVICE_ROLE, EMR_INSTANCE_PROFILE
Reads terraform output -json from terraform/envs/dev/. Caches nothing — always reads live state.
shared/scripts/wait_for_step.sh
./shared/scripts/wait_for_step.sh <CLUSTER_ID> [<REGION>]
Polls every 30 seconds. Exits 0 when all steps are COMPLETED. Exits 1 if any step reaches FAILED or CANCELLED.
shared/configs/emr-instance-fleet.json
Instance fleet spec used by all create-cluster calls:
[
{
"Name": "Master",
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
},
{
"Name": "Core",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 2,
"LaunchSpecifications": {
"SpotSpecification": {
"TimeoutDurationMinutes": 10,
"TimeoutAction": "SWITCH_TO_ON_DEMAND",
"AllocationStrategy": "capacity-optimized"
}
},
"InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
}
]
Cost Awareness
All clusters use --auto-termination-policy 'IdleTimeout=3600' except lab-06 (Oozie), which is persistent. Always terminate Oozie clusters after the lab:
aws emr terminate-clusters --cluster-ids <CLUSTER_ID> --region us-east-1
Typical cost per lab run: < $0.50 (2× m5.xlarge Spot, ~15 minutes).