Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

EMR Labs — Three-Engine COVID-19 Data Pipeline

An end-to-end educational lab system that builds the same COVID-19 analytics pipeline three different ways on Amazon EMR 7.2.0: with Apache Hive + Tez, Apache Spark, and Apache Flink. Each flavour shares the same AWS infrastructure but demonstrates the distinct programming model, strengths, and operational patterns of its engine.


Table of Contents

  1. Repository Structure
  2. Shared Infrastructure
  3. Lab Flavours Overview
  4. Engine Comparison
  5. Getting Started
  6. Prerequisites
  7. Data Sources
  8. Shared Scripts Reference

1. Repository Structure

emr-labs/
├── README.md                          ← this file
├── terraform/
│   ├── envs/dev/
│   │   ├── main.tf
│   │   ├── outputs.tf
│   │   ├── variables.tf
│   │   ├── versions.tf
│   │   └── terraform.tfvars
│   └── modules/
│       ├── s3/
│       └── iam/
├── shared/
│   ├── configs/
│   │   └── emr-instance-fleet.json    ← EC2 instance fleet spec (all flavours)
│   └── scripts/
│       ├── get_tf_outputs.sh          ← exports Terraform outputs as env vars
│       └── wait_for_step.sh           ← polls EMR step status
├── emr-hive-tez-labs/                 ← Hive + Tez flavour
├── emr-spark-labs/                    ← Apache Spark (PySpark) flavour
└── emr-flink-labs/                    ← Apache Flink (SQL + PyFlink) flavour

Each flavour follows the same internal layout:

emr-<engine>-labs/
├── README.md
├── shared/configs/<engine>-config.json   ← EMR application configurations
└── labs/
    ├── lab-00-foundation/      ← data ingestion from public S3
    ├── lab-01-*                ← create external tables / register sources
    ├── lab-02-*                ← catalog inspection
    ├── lab-03-*                ← enrichment join + Parquet output
    ├── lab-04-*                ← engine-specific advanced feature
    ├── lab-05-athena/          ← Athena federated queries (all 3 flavours)
    └── lab-06-oozie-*/         ← Oozie scheduling

2. Shared Infrastructure

2.1 Terraform

All three flavours share a single Terraform deployment in terraform/envs/dev/.

cd terraform/envs/dev
terraform init
terraform apply -var-file="terraform.tfvars"

Terraform creates:

ResourceName patternPurpose
S3 bucketemr-labs-raw-*Raw CSV data copied from public sources
S3 bucketemr-labs-intermediate-*Processed Parquet / ORC output
S3 bucketemr-labs-logs-*EMR logs, Athena results, staging
Glue databaseemr_labs_dbShared Hive-compatible metastore
IAM roleEMR_DefaultRoleEMR service role
IAM roleEMR_EC2_DefaultRoleEC2 instance profile

2.2 EC2 Instance Fleet

All clusters use shared/configs/emr-instance-fleet.json:

  • Master: 1 × m5.xlarge On-Demand
  • Core: 2 × m5.xlarge Spot, capacity-optimized strategy, 10-minute provisioning timeout

2.3 get_tf_outputs.sh

Sourced by every lab’s run_lab.sh. Exports:

RAW_BUCKET          INTERMEDIATE_BUCKET    LOGS_BUCKET
GLUE_DB             EMR_SERVICE_ROLE       EMR_INSTANCE_PROFILE
source ../../shared/scripts/get_tf_outputs.sh
echo "$RAW_BUCKET"

2.4 wait_for_step.sh

Polls all EMR steps on a cluster until terminal state.

./shared/scripts/wait_for_step.sh <CLUSTER_ID> [<REGION>]

3. Lab Flavours Overview

emr-hive-tez-labs — Apache Hive + Tez

Best for: SQL-native teams, existing Hive workloads, simple batch ETL, HiveQL familiarity.

LabFocus
lab-00Copy public COVID CSV from s3://covid19-lake to raw bucket
lab-01External tables with OpenCSVSerde; run HiveQL analytics
lab-02Catalog inspection (SHOW TABLES, DESCRIBE, MSCK REPAIR)
lab-03CTAS enrichment join → Parquet + ORC output
lab-04ACID transactions, INSERT/UPDATE/DELETE, compaction, time-travel
lab-05Athena federated queries against Glue catalog
lab-06Oozie coordinator with <hive> action, daily scheduling

emr-spark-labs — Apache Spark (PySpark)

Best for: Large-scale ML pipelines, DataFrame/Dataset API, Iceberg ACID tables, Catalyst optimizer, complex transformations.

LabFocus
lab-00Copy public data (same shell script pattern as Hive)
lab-01SparkSession + enableHiveSupport(), DataFrame and Spark SQL analytics
lab-02Catalog API: spark.catalog.listTables, DESCRIBE FORMATTED, explain() plans
lab-03DataFrame join → Parquet (snappy) + ORC (zlib, bloom filter), validation
lab-04Apache Iceberg: ACID DELETE/UPDATE, schema evolution, TIMESTAMP AS OF time-travel, snapshots
lab-05Athena queries against Spark-written Parquet
lab-06Oozie <spark> action (oozie:spark-action:0.2), coordinator

Best for: Stream processing, event-time windowing, exactly-once semantics, low-latency incremental pipelines.

LabFocus
lab-00Copy public data
lab-01Flink SQL: CREATE CATALOG (Glue/Hive), CREATE TABLE with connector=filesystem, tableau results
lab-02Catalog inspection in Flink SQL (SHOW CATALOGS, DESCRIBE, SHOW CREATE TABLE)
lab-03Enrichment join in Flink SQL → Parquet sink, validate output
lab-04Windowing: TUMBLE (7-day), HOP (30d/7d slide), SESSION (14d gap), PyFlink DataStream API
lab-05Athena queries against Flink-written Parquet
lab-06Oozie with <shell> action wrapping Flink SQL Client, coordinator

4. Engine Comparison

DimensionHive + TezSparkFlink
ParadigmSQL-first batchMicro-batch / batchTrue stream + batch
LanguageHiveQLPySpark / ScalaFlink SQL / Java / Python
LatencyMinutesSeconds–minutesMilliseconds–seconds
State managementNoneIn-memory RDDRocksDB, exactly-once
ACIDNative (ORC ACID)Iceberg / DeltaIceberg (via connector)
WindowingNone (GROUP BY DATE)Window functions (SQL)Tumbling / Sliding / Session TVF
CatalogGlue AWSGlueDataCatalogHiveClientFactorySame factory via spark-hive-siteGlue via Flink Hive connector
Oozie action<hive><spark><shell> (no native Flink action)
Best forLegacy SQL, simple ETLComplex transforms, ML, large joinsReal-time, event-time, stateful
EMR releaseemr-7.2.0emr-7.2.0emr-7.2.0

When to choose each engine

Choose Hive + Tez when:

  • Your team writes SQL natively and wants no JVM/Python overhead
  • You have an existing HiveQL workload to migrate
  • The pipeline is simple SELECT / INSERT OVERWRITE / CTAS with daily cadence
  • You need native ORC ACID without a separate table format dependency

Choose Spark when:

  • You need Python or Scala beyond SQL
  • Your pipeline involves ML (MLlib, PyTorch via SparkTorch) or graph processing
  • You want Apache Iceberg for cross-engine ACID and schema evolution
  • Data volume is very large and you need Catalyst’s advanced query optimisation (AQE, DPP)
  • You need DataFrame-level transformations that are complex to express in SQL

Choose Flink when:

  • Latency requirements are sub-minute (streaming ingestion, real-time dashboards)
  • Event-time ordering and watermarks are essential to correctness
  • You need stateful operations (CEP, exactly-once aggregation, session detection)
  • The pipeline is incremental: process new records as they arrive, not full daily reloads
  • You want to run the same code in both batch (backfill) and streaming (live) modes

5. Getting Started

Quick Start (all three flavours)

# 1. Provision shared infrastructure
cd terraform/envs/dev
terraform init && terraform apply -auto-approve

# 2. Copy public data to your S3 raw bucket (run once)
./emr-hive-tez-labs/labs/lab-00-foundation/scripts/copy_public_data.sh
# (or emr-spark-labs / emr-flink-labs — all do the same copy)

# 3. Run any flavour's lab series
cd emr-spark-labs/labs/lab-01-register-tables/scripts
./run_lab.sh

Setting Optional Environment Variables

export AWS_DEFAULT_REGION=us-east-1   # default us-east-1
export KEY_PAIR=my-ec2-keypair        # adds KeyName to EC2 attributes (for SSH)
export ATHENA_WORKGROUP=primary       # default primary (lab-05 only)

Lab Execution Order

Run labs in numeric order within a flavour. Each lab builds on the previous:

lab-00 → lab-01 → lab-02 → lab-03 → lab-04 → lab-05 → lab-06
(data)   (tables) (catalog) (join)   (advanced)(athena) (schedule)

Lab-05 and lab-06 of all three flavours can be run independently after lab-03 is complete, because Athena reads the Parquet written by lab-03 and Oozie simply schedules the same pipeline.


6. Prerequisites

RequirementVersionNotes
AWS CLI≥ 2.15Configured with a profile that has EMR, S3, Glue, Athena IAM permissions
Terraform≥ 1.6HashiCorp OSS or Terraform Cloud
Bash≥ 4.0macOS ships with 3.x; install with brew install bash
jqanyUsed by get_tf_outputs.sh to parse Terraform output

IAM permissions required (attach to the user/role that runs these scripts):

{
  "Effect": "Allow",
  "Action": [
    "elasticmapreduce:*",
    "s3:*",
    "glue:*",
    "athena:*",
    "iam:PassRole"
  ],
  "Resource": "*"
}

For production, scope Resource to specific ARNs.


7. Data Sources

All labs use publicly available COVID-19 data hosted on the AWS COVID-19 Data Lake (s3://covid19-lake, us-east-1).

NY Times US Counties

s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-counties/

Schema:

ColumnTypeDescription
dateSTRINGReport date (YYYY-MM-DD)
countySTRINGCounty name
stateSTRINGUS state name
fipsSTRING5-digit FIPS code
casesBIGINTCumulative confirmed cases
deathsBIGINTCumulative deaths

Definitive Healthcare Hospital Beds

s3://covid19-lake/rearc-usa-hospital-beds/csv/

Schema (selected columns):

ColumnTypeDescription
county_fips_codeSTRING5-digit FIPS (join key)
county_nameSTRINGCounty name
state_nameSTRINGState
num_staffed_bedsDOUBLELicensed staffed beds
num_icu_bedsDOUBLEICU beds
num_licensed_bedsDOUBLETotal licensed beds

Join key: nytimes_counties.fips = hospital_beds.county_fips_code


8. Shared Scripts Reference

shared/scripts/get_tf_outputs.sh

source shared/scripts/get_tf_outputs.sh
# Exports: RAW_BUCKET, INTERMEDIATE_BUCKET, LOGS_BUCKET,
#          GLUE_DB, EMR_SERVICE_ROLE, EMR_INSTANCE_PROFILE

Reads terraform output -json from terraform/envs/dev/. Caches nothing — always reads live state.

shared/scripts/wait_for_step.sh

./shared/scripts/wait_for_step.sh <CLUSTER_ID> [<REGION>]

Polls every 30 seconds. Exits 0 when all steps are COMPLETED. Exits 1 if any step reaches FAILED or CANCELLED.

shared/configs/emr-instance-fleet.json

Instance fleet spec used by all create-cluster calls:

[
  {
    "Name": "Master",
    "InstanceFleetType": "MASTER",
    "TargetOnDemandCapacity": 1,
    "InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
  },
  {
    "Name": "Core",
    "InstanceFleetType": "CORE",
    "TargetSpotCapacity": 2,
    "LaunchSpecifications": {
      "SpotSpecification": {
        "TimeoutDurationMinutes": 10,
        "TimeoutAction": "SWITCH_TO_ON_DEMAND",
        "AllocationStrategy": "capacity-optimized"
      }
    },
    "InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
  }
]

Cost Awareness

All clusters use --auto-termination-policy 'IdleTimeout=3600' except lab-06 (Oozie), which is persistent. Always terminate Oozie clusters after the lab:

aws emr terminate-clusters --cluster-ids <CLUSTER_ID> --region us-east-1

Typical cost per lab run: < $0.50 (2× m5.xlarge Spot, ~15 minutes).