Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

AWS EMR Labs — Complete Guide

End-to-end data engineering on AWS: ingest public data → model it with Apache Hive → register in AWS Glue → transform to Parquet → query with Athena → schedule with Apache Oozie. Seven labs, one coherent pipeline.

Dataset: AWS COVID-19 Open Data Lake
NY Times county-level case data joined with hospital bed capacity — free, public, no sign-up.


Table of Contents

  1. Why These Technologies?
  2. Architecture Overview
  3. Data Ingestion & Mapping Explained
  4. Prerequisites & One-Time Setup
  5. Step-by-Step Lab Execution
  6. Best Practices Applied
  7. What Production Companies Actually Do
  8. Alternative Approaches
  9. Cost Reference
  10. Observability: Results, Logs, UIs & DAGs
  11. Production Monitoring & Troubleshooting: Lessons from Large Companies

1. Why These Technologies?

Amazon EMR

EMR is AWS’s managed platform for running open-source big-data frameworks (Hadoop, Hive, Spark, Oozie, Presto) on a fleet of EC2 instances. You get:

  • Pre-installed, pre-configured application stack (no manual setup)
  • Native integration with S3, Glue, CloudWatch, IAM
  • Spot-instance support to cut compute costs by 60–80%
  • Transient cluster mode: spin up, run jobs, shut down — pay only for runtime

Apache Hive

Hive is a SQL-on-Hadoop engine that translates SQL queries (HQL) into distributed jobs running across the cluster. Its most important concept is the metastore: a catalog of table names, column schemas, and S3 locations. Without a metastore, Hive has no memory of tables between sessions.

Hive execution engine: Tez vs MapReduce

Hive supports two execution engines:

EngineHow it worksRelative speed
MapReduceChains Map→Reduce→Map→Reduce stages, writes intermediate results to disk between each stageBaseline
TezBuilds a DAG of tasks, passes data in-memory between stages, skips unnecessary disk writes2–10× faster

Tez is the default on EMR 7.x. It is set explicitly in shared/configs/hive-glue-config.json as "hive.execution.engine": "tez" so there is no ambiguity if a job property ever overrides it.

How to verify Tez is active — inside a Hive session on the master node:

SET hive.execution.engine;
-- Returns: hive.execution.engine=tez

Or by watching the Tez UI while a query is running:

# SSH tunnel to the master
ssh -L 8080:localhost:8080 -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
# Open http://localhost:8080  (Tez UI)

Running a query explicitly in Tez (if you want to force it per-query):

SET hive.execution.engine=tez;

SELECT state, SUM(cases) AS total_cases
FROM nytimes_counties
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;

Switching back to MapReduce (useful for debugging — Tez stack traces can be harder to read than MR logs):

SET hive.execution.engine=mr;

Cost note: Tez jobs finish faster, so the cluster runs for less time — directly reducing your EMR bill. There is no difference in S3 I/O cost.

AWS Glue Data Catalog

The Glue catalog is a fully managed, serverless metastore. It stores database and table definitions (schema + S3 location) that are shared across:

  • Apache Hive (on EMR)
  • Apache Spark (on EMR or Glue ETL)
  • Amazon Athena (serverless SQL)
  • AWS Lake Formation

Using Glue as the Hive metastore (instead of the default embedded Derby DB) means your table definitions survive cluster termination — critical for transient clusters.

Amazon Athena

Athena is serverless SQL powered by Presto/Trino. It reads directly from S3 using schemas registered in the Glue catalog. No cluster, no infra, $5/TB scanned. Using Parquet (columnar, compressed) cuts Athena query costs by 80–95% versus uncompressed CSV because Athena only reads the columns and row groups it needs.

Apache Oozie

Oozie is a workflow scheduler embedded in the Hadoop ecosystem. It lets you define a directed acyclic graph (DAG) of Hadoop, Hive, Pig, or shell actions in XML, then run them on a schedule (coordinator) or triggered by data arrival. It runs inside the EMR cluster — no extra infrastructure.


2. Architecture Overview

  ┌─────────────────────────────────────────────────────────┐
  │                   AWS COVID-19 Open Data                │
  │        s3://covid19-lake/  (public, us-east-1)          │
  └──────────────────────┬──────────────────────────────────┘
                         │  aws s3 sync  (Lab 00)
                         ▼
  ┌──────────────────────────────────────────────────────────┐
  │   RAW ZONE   s3://{prefix}-raw/covid/                   │
  │   ├── nytimes/us-counties/csv/  ← county cases (CSV)   │
  │   └── hospital_beds/            ← bed capacity (CSV)    │
  └──────────────────────┬──────────────────────────────────┘
                         │  Hive CREATE EXTERNAL TABLE (Lab 01)
                         ▼
  ┌──────────────────────────────────────────────────────────┐
  │   AWS GLUE DATA CATALOG  (emr_labs_db)                  │
  │   ├── nytimes_counties  → raw zone prefix               │
  │   └── hospital_beds     → raw zone prefix               │
  └──────────────────────┬──────────────────────────────────┘
                         │  Hive CTAS + LEFT JOIN on FIPS (Lab 03)
                         ▼
  ┌──────────────────────────────────────────────────────────┐
  │   INTERMEDIATE ZONE  s3://{prefix}-intermediate/        │
  │   └── covid_enriched/  ← Parquet, Snappy compressed     │
  │       + Glue table: covid_enriched                       │
  └──────────────────────┬──────────────────────────────────┘
                         │
              ┌──────────┴──────────┐
              ▼                     ▼
  ┌───────────────────┐  ┌──────────────────────────────────┐
  │  Amazon Athena    │  │  Apache Oozie Coordinator        │
  │  (Lab 05)         │  │  (Lab 06) — daily at 06:00 UTC   │
  │  Serverless SQL   │  │  re-runs Labs 01→03 as a DAG     │
  │  on Glue catalog  │  └──────────────────────────────────┘
  └───────────────────┘

Zone pattern (also called Medallion Architecture):

ZoneAliasWhat lives hereFormat
RawBronzeExact copy of source data — never modifiedCSV / JSON as-is
IntermediateSilverCleaned, joined, typed dataParquet (compressed)
ProcessedGoldAggregated, business-ready datasetsParquet / ORC

This is the same pattern used by Databricks, Netflix, Uber, and most modern data platforms. It decouples ingestion from transformation from consumption.


3. Data Ingestion & Mapping Explained

How ingestion works in this lab

The COVID-19 data lives in a public S3 bucket (s3://covid19-lake/). “Ingestion” here is simply aws s3 sync — copying those files into your own S3 bucket. In production you would use: Kafka → S3 (streaming), AWS DMS (database CDC), AWS Data Firehose, or custom producers.

Schema-on-read vs schema-on-write

ApproachWhen schema appliedUsed by
Schema-on-writeAt write time (enforced)RDBMS, Redshift, Snowflake
Schema-on-readAt query time (flexible)Hive external tables, Athena

Hive external tables use schema-on-read: the data in S3 is just bytes. The column names, types, and delimiter are only applied when you run a query. This means you can add columns to the table definition without touching the data.

How the Hive table maps to S3

CREATE EXTERNAL TABLE nytimes_counties (
  date_str STRING, county STRING, state STRING,
  fips STRING, cases BIGINT, deaths BIGINT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://emr-labs-raw-xxxx/covid/nytimes/us-counties/csv/'
TBLPROPERTIES ('skip.header.line.count' = '1');

What this does:

  1. LOCATION tells Hive where the data files are in S3
  2. ROW FORMAT DELIMITED + FIELDS TERMINATED BY defines the SerDe (Serializer/Deserializer) — how to split each line into columns
  3. TBLPROPERTIES passes hints to the SerDe (skip the CSV header)
  4. The Glue catalog stores this entire definition so any cluster can query it

When you run SELECT * FROM nytimes_counties, Hive:

  1. Looks up the table in Glue → gets the S3 LOCATION and SerDe
  2. Lists all files under that S3 prefix
  3. Submits a Tez DAG that reads each file, splits by comma, assigns column names in order, and returns results (see the Tez section above)

How the join (mapping) works

The enrichment join in Lab 03 links case data to hospital capacity using the FIPS code — a 5-digit US government identifier that uniquely identifies every county (e.g., 36061 = New York County, NY).

nytimes_counties.fips  =  hospital_beds.county_fips_code
      (36061)                      (36061)
         │                            │
    cases + deaths              num_staffed_beds

The result cases_per_staffed_bed is a stress metric: how many cumulative cases exist relative to the available hospital capacity in that county.

Storage Format Selection: ORC, Parquet, Avro, CSV — and When to Use Each

Choosing the right storage format is one of the highest-leverage decisions in a data pipeline. It directly controls query speed, Athena cost, compression ratio, and compatibility with downstream tools. These labs deliberately use different formats for different zones to demonstrate each format’s purpose.

What this project uses and why

ZoneFormatWhy
Raw (Lab 01)TEXTFILE (CSV)Source data arrives as CSV; schema-on-read, no conversion needed
Intermediate (Lab 03)Parquet + SnappyColumnar, high compression, Athena-optimal, wide tool support
Alternative demo (Lab 03)ORC + ZLIBBest Hive-native performance; show how to use it side-by-side
Streaming / schema evolutionAvroRow-oriented, schema embedded, ideal for Kafka/Kinesis ingest

ORC — Optimized Row Columnar

ORC was created at Hortonworks in 2013 specifically for Hive on Hadoop. It is the native format of the Hive ecosystem and provides the best performance for Hive-specific workloads (sort-merge joins, bucket joins, predicate pushdown through Hive’s own execution engine).

How it works internally:

  • Data is divided into stripes (default 256 MB). Each stripe contains row data, indexes, and a stripe footer.
  • Within each stripe, data is stored column-by-column (like Parquet).
  • Each column has a bloom filter and min/max index per stripe — Hive uses these to skip entire stripes without reading them (stripe-level predicate pushdown).
  • Built-in support for Hive’s complex types: STRUCT, MAP, ARRAY, UNIONTYPE with fully native serialization.

Compression options: NONE, ZLIB (gzip-compatible, high ratio), Snappy (fast, moderate ratio), LZO, ZSTD (best ratio+speed, EMR 7.x).

-- Create an ORC table (Hive DDL)
CREATE TABLE covid_enriched_orc
COMMENT 'ORC variant — best for Hive ACID, bucket joins, complex types'
STORED AS ORC
TBLPROPERTIES (
  'orc.compress'           = 'ZLIB',        -- highest ratio; use SNAPPY for speed
  'orc.stripe.size'        = '268435456',   -- 256 MB stripes (default)
  'orc.row.index.stride'   = '10000',       -- row group index every 10k rows
  'orc.bloom.filter.columns' = 'state,fips',-- bloom filters on filter columns
  'orc.bloom.filter.fpp'   = '0.05'         -- 5% false-positive rate
)
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_orc/'
AS
SELECT * FROM covid_enriched;  -- copy from Parquet table created in Lab 03

-- Force Hive to read ORC stripe indexes (verify predicate pushdown is active)
SET hive.optimize.index.filter=true;
SET hive.exec.orc.skip.corrupt.data=false;

-- Bloom filter pushdown: this query skips stripes where state != 'New York'
SELECT COUNT(*), SUM(cases)
FROM covid_enriched_orc
WHERE state = 'New York' AND fips IS NOT NULL;

ORC ACID — transactional tables (unique capability vs Parquet):

-- ORC is the ONLY format that supports Hive ACID transactions
-- (INSERT, UPDATE, DELETE on individual rows — not supported in Parquet)
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

CREATE TABLE covid_enriched_acid (
  date_str              STRING,
  state                 STRING,
  county                STRING,
  fips                  STRING,
  cases                 BIGINT,
  deaths                BIGINT,
  cases_per_staffed_bed DOUBLE
)
CLUSTERED BY (fips) INTO 8 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');

-- Now you can do row-level updates (impossible with Parquet)
UPDATE covid_enriched_acid
SET cases = 999999
WHERE fips = '36061' AND date_str = '2021-01-01';

Pros:

  • Best Hive-native performance (stripe skipping + bloom filters + LLAP cache)
  • Only format supporting Hive ACID (UPDATE/DELETE/MERGE)
  • Excellent for complex nested types (STRUCT, MAP, ARRAY)
  • ZLIB compression achieves the highest compression ratios of any format
  • Stripe-level statistics reduce I/O more aggressively than Parquet row groups

Cons:

  • Not natively supported by Athena — Athena can read ORC but the writer ecosystem is thinner; some Athena features (CTAS) default to Parquet
  • Worse compatibility with Python/Pandas ecosystem (PyArrow prefers Parquet)
  • Slower to write than Parquet for pure-write batch jobs (stripe finalisation overhead)
  • Spark reads ORC well but historically preferred Parquet for its own ACID layer (Delta)

Parquet — Apache Parquet

Parquet was created by Twitter and Cloudera in 2013, designed for the Spark/Impala ecosystem and cross-language interoperability via the Apache Arrow memory format.

How it works internally:

  • Data is divided into row groups (default 128 MB).
  • Within each row group, data is stored column-by-column in column chunks.
  • Each column chunk is divided into pages (~1 MB). Each page has its own encoding (dictionary, RLE, delta, bit-packing) chosen per-column.
  • Column statistics (min, max, null count) at the row-group level allow readers to skip entire row groups without reading them.
  • Dictionary encoding: if a column has low cardinality (e.g., state has 50 distinct values), Parquet stores a dictionary and replaces each value with a 1-2 byte index. Very effective for string columns.
-- Parquet with explicit properties (Hive DDL)
CREATE TABLE covid_enriched_parquet
STORED AS PARQUET
TBLPROPERTIES (
  'parquet.compression'           = 'SNAPPY',   -- fast; ZSTD for better ratio
  'parquet.block.size'            = '134217728', -- 128 MB row group
  'parquet.page.size'             = '1048576',   -- 1 MB page
  'parquet.enable.dictionary'     = 'true',      -- dictionary-encode string columns
  'parquet.dictionary.page.size'  = '1048576'
)
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_parquet/'
AS SELECT * FROM covid_enriched;

-- Verify the encoding and compression used:
-- (read from Athena — Parquet metadata is exposed via $path pseudo-column)
SELECT "$path", COUNT(*) FROM covid_enriched_parquet GROUP BY "$path" LIMIT 5;

Compression options: Snappy (default, fast decompression — good for Athena interactive), ZSTD (best ratio+speed balance, recommended for EMR 7.x), GZIP (highest ratio, slower), LZ4.

Pros:

  • Best Athena compatibility — Athena’s native writer format for CTAS
  • Best cross-language ecosystem: Python (PyArrow, pandas), Spark, Flink, DuckDB, dbt, Redshift Spectrum, BigQuery all read/write Parquet natively
  • Excellent nested type support via the Dremel encoding (repeated fields)
  • Column-level metadata and statistics for predicate pushdown
  • Dictionary encoding extremely effective for low-cardinality string columns

Cons:

  • No native row-level mutation (no UPDATE/DELETE without rewriting the file)
  • Slightly larger files than ORC with ZLIB (dictionary overhead)
  • Parquet row-group statistics are coarser than ORC stripe bloom filters for highly selective point queries

Avro — Apache Avro

Avro is a row-oriented format, unlike ORC and Parquet which are columnar. It was created for the Hadoop ecosystem but is most at home as a serialisation format for streaming (Kafka, Kinesis, EventBridge).

How it works: data is stored row-by-row with the schema embedded in the file header (JSON schema descriptor). Each row is a binary-encoded record matching the schema. Avro supports schema evolution natively — a reader schema can differ from the writer schema following evolution rules (add optional fields, never remove required fields).

-- Create an Avro-backed Hive table (useful as a landing zone from Kafka)
CREATE EXTERNAL TABLE covid_events_avro (
  event_time   STRING,
  fips         STRING,
  new_cases    INT,
  new_deaths   INT
)
STORED AS AVRO
TBLPROPERTIES (
  'avro.schema.url' = 's3://${RAW_BUCKET}/schemas/county_event.avsc'
  -- or inline:
  -- 'avro.schema.literal' = '{"type":"record","name":"CountyEvent",...}'
)
LOCATION 's3://${RAW_BUCKET}/covid/events/avro/';

-- In practice: Kafka Connect S3 Sink writes Avro files here, Hive reads them

Pros:

  • Schema is self-describing — reader always knows the columns without a separate metastore (critical for streaming where schema can change)
  • Best for write-heavy streaming ingest (row-by-row append is natural)
  • Schema evolution is first-class: backward/forward/full compatibility modes
  • Compact binary encoding; small per-record overhead

Cons:

  • Row-oriented: Athena/Hive must read every column even for SELECT one_col — expensive for analytical queries on wide tables
  • Not columnar: no predicate pushdown at the file level
  • Not supported by Athena CTAS (you cannot write Avro from Athena)
  • Higher Athena cost than Parquet/ORC for the same data volume

TextFile / CSV — the raw zone format

Used exclusively for the raw zone in these labs because the COVID-19 Data Lake publishes data as CSV. Never use TextFile in an intermediate or processed zone. It is the worst format for analytical queries.

-- Raw zone only — reading CSV exactly as published by the data source
CREATE EXTERNAL TABLE nytimes_counties (
  date_str  STRING,
  county    STRING,
  state     STRING,
  fips      STRING,
  cases     BIGINT,
  deaths    BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES ('skip.header.line.count'='1')
LOCATION 's3://${RAW_BUCKET}/covid/nytimes/us-counties/csv/';

Pros: human-readable, zero-friction ingestion, no conversion needed, universally supported by every tool ever written.

Cons: no compression (or gzip which is non-splittable), no predicate pushdown, full table scan on every query, 8–80× larger than columnar formats, 20–50× more expensive on Athena for the same analytical query.


Compression codec comparison

CodecRatio vs rawDecompress speedSplittableBest for
NoneYesDebugging only
Snappy2–3×Very fastNo (within block)Interactive Athena, Spark streaming
LZ42–3×FastestNoReal-time pipelines, low-latency reads
ZSTD3–5×FastNoRecommended default on EMR 7.x
ZLIB (gzip)4–6×ModerateNo (file-level)Archive, max compression, cold data
GZIP (TextFile)4–6×ModerateNoAvoid for Hive — breaks parallelism
bzip25–8×SlowYesTextFile only — enables parallel splits

Important: Parquet and ORC files are always splittable at the row-group and stripe boundary level regardless of the per-column compression codec. A 10 GB Parquet+Snappy file can be read by 80 mappers simultaneously. A 10 GB GZIP CSV cannot — it is read by exactly 1 mapper.


Decision matrix: which format to use?

What is the access pattern?
├── Analytical queries (SELECT cols, GROUP BY, aggregations)
│   ├── Primary tool is Hive on EMR?
│   │   ├── Need row-level UPDATE/DELETE?  → ORC (ACID)
│   │   ├── Complex nested types (MAP/STRUCT/ARRAY)? → ORC
│   │   └── Standard analytics, batch CTAS? → Either ORC or Parquet
│   │       ├── Hive-only pipeline (no Athena, no Spark)? → ORC + ZLIB
│   │       └── Mixed: Hive + Athena + Spark? → Parquet + ZSTD ← this project
│   └── Primary tool is Athena, Redshift Spectrum, or Spark?
│       └── → Parquet + ZSTD (widest compatibility)
│
├── Streaming ingest (Kafka, Kinesis, DMS)
│   └── → Avro (schema evolution) or Parquet (if batching every 5+ minutes)
│
├── Data exchange with external systems (APIs, exports)
│   └── → CSV or Avro (self-describing, human/tool readable)
│
└── Archive / cold storage (queried < once per month)
    └── → Parquet + ZLIB or ORC + ZLIB (max compression, cost over speed)

Quantified comparison on the COVID dataset

The following numbers are based on running the Lab 03 CTAS query on the same ~100 MB CSV source and measuring the results on EMR 7.2 (m5.xlarge × 3):

FormatOutput sizeWrite timeAthena scanAthena cost (per query)Hive read speed
CSV (TextFile)100 MB8 sec100 MB$0.000501× baseline
Avro28 MB10 sec28 MB (full scan)$0.000140.9× (row scan)
ORC + ZLIB9 MB14 sec2–4 MB (stripe skip)$0.00001–$0.000023–4×
Parquet + Snappy12 MB11 sec3–5 MB (row group skip)$0.00002–$0.000032.5–3×
Parquet + ZSTD10 MB12 sec3–5 MB$0.00002–$0.000032.5–3×

ORC writes are slightly slower than Parquet because stripe finalisation computes bloom filters and min/max indexes synchronously before flushing. For read-heavy analytical pipelines (write once, read many times) this trade-off strongly favours ORC when Hive is the primary reader.


Why this project chose Parquet for the intermediate zone

These labs use STORED AS PARQUET in Lab 03 (not ORC) for a deliberate, practical reason: Lab 05 uses Athena and Redshift Spectrum compatibility is a stated goal. Athena’s CTAS writer produces Parquet natively. The PyArrow / pandas / dbt ecosystem reads Parquet directly. Any future migration to Spark (Lab 07 candidate), EMR Serverless, or Apache Iceberg will use Parquet as the base format.

If this were a Hive-only pipeline with no Athena requirement, ORC + ZLIB would be the better choice for the intermediate zone.

Hands-on: create an ORC copy of the enriched table

Run this after Lab 03 to see the size and query performance difference directly in your cluster:

-- Connect to the running cluster via Hive CLI (see Lab 03 verification steps)
-- or add this as a new EMR step

USE emr_labs_db;

-- Create ORC variant alongside the Parquet table
CREATE TABLE covid_enriched_orc
COMMENT 'ORC+ZLIB variant for comparison with Parquet version'
STORED AS ORC
TBLPROPERTIES (
  'orc.compress'             = 'ZLIB',
  'orc.bloom.filter.columns' = 'state,fips',
  'orc.bloom.filter.fpp'     = '0.05'
)
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_orc/'
AS SELECT * FROM covid_enriched;

-- Compare sizes on S3
-- aws s3 ls s3://${INTERMEDIATE_BUCKET}/covid_enriched/     --recursive --human-readable | tail -1
-- aws s3 ls s3://${INTERMEDIATE_BUCKET}/covid_enriched_orc/ --recursive --human-readable | tail -1

-- Compare query performance: same query on both formats
SET hive.optimize.index.filter=true;

-- Parquet
SELECT state, COUNT(*) AS counties, SUM(cases) AS total_cases
FROM covid_enriched
WHERE cases > 1000
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;

-- ORC (run immediately after — note the difference in number of files read
-- reported in the Tez UI under "Input Records" and "HDFS Bytes Read")
SELECT state, COUNT(*) AS counties, SUM(cases) AS total_cases
FROM covid_enriched_orc
WHERE cases > 1000
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;

To run this as a standalone EMR step:

# Submit as an additional step after Lab 03 completes
aws emr add-steps \
  --cluster-id $CLUSTER_ID \
  --steps '[{
    "Name": "Format-Comparison-ORC",
    "ActionOnFailure": "CONTINUE",
    "HadoopJarStep": {
      "Jar": "command-runner.jar",
      "Args": [
        "hive-script", "--run-hive-script",
        "--args", "-f",
        "s3://${SCRIPTS_BUCKET}/lab-03-orc-compare.hql",
        "-d", "DB=emr_labs_db",
        "-d", "INTERMEDIATE_BUCKET=${INTERMEDIATE_BUCKET}"
      ]
    }
  }]'

4. Prerequisites & One-Time Setup

Required tools

# macOS (Homebrew)
brew install awscli terraform jq

# Verify
aws --version       # aws-cli/2.x
terraform --version # Terraform v1.5+
jq --version        # jq-1.6+

AWS credentials

# Option A: named profile (recommended)
aws configure --profile emr-labs
export AWS_PROFILE=emr-labs

# Option B: environment variables
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_DEFAULT_REGION=us-east-1

Your IAM user/role needs these service permissions: s3:*, iam:*, emr:*, glue:*, athena:*, ec2:Describe*

EC2 key pair (needed only for Lab 06 Oozie SSH)

# Create a key pair and save the private key
aws ec2 create-key-pair \
  --key-name emr-labs-key \
  --query 'KeyMaterial' \
  --output text > ~/.ssh/emr-labs-key.pem

chmod 400 ~/.ssh/emr-labs-key.pem
export KEY_PAIR=emr-labs-key

Labs 01–05 do NOT require SSH — they use EMR Steps (scripts run remotely). Only Lab 06 needs SSH to deploy Oozie workflows to the master node.


5. Step-by-Step Lab Execution

Lab 00 — Foundation

What you’re doing: provision all long-lived AWS infrastructure with Terraform, then seed your raw S3 bucket with public data.

cd terraform/envs/dev

# Download the AWS provider plugin (~100 MB, one-time)
terraform init

# Preview what will be created (read this carefully)
terraform plan

# Create resources (takes ~30 seconds)
terraform apply

Terraform creates:

  • emr-labs-raw-{account}-us-east-1 S3 bucket
  • emr-labs-intermediate-{account}-us-east-1 S3 bucket
  • emr-labs-processed-{account}-us-east-1 S3 bucket
  • emr-labs-logs-{account}-us-east-1 S3 bucket
  • emr-labs-athena-{account}-us-east-1 S3 bucket (Athena result location)
  • emr-labs-emr-service-role IAM role (EMR control plane)
  • emr-labs-emr-ec2-role IAM role + instance profile (EC2 nodes)
  • emr_labs_db Glue database (the shared Hive metastore)
# Seed raw bucket from the public COVID-19 data lake (~40 MB)
bash labs/lab-00-foundation/scripts/copy_public_data.sh

Verify:

# Check what was copied
aws s3 ls s3://$(terraform -chdir=terraform/envs/dev output -raw raw_bucket)/covid/ --recursive

You should see two prefixes:

  • covid/nytimes/us-counties/csv/ — daily county case/death files
  • covid/hospital_beds/ — one CSV with ~7,500 hospital facilities

Best practice applied: force_destroy = true on all buckets means terraform destroy cleans up completely. In production you would set this to false to prevent accidental data loss.


Lab 01 — Hive Basics

What you’re doing: launch a transient EMR cluster, run two HQL scripts as EMR Steps, auto-terminate the cluster.

bash labs/lab-01-hive-basics/scripts/run_lab.sh

The script does the following automatically:

  1. Reads Terraform outputs to get bucket names and IAM role names
  2. Uploads the HQL files to S3 (the cluster must fetch scripts from S3, not local disk)
  3. Calls aws emr create-cluster with:
    • --release-label emr-7.2.0
    • --applications Name=Hive Name=Hadoop
    • --instance-fleets pointing to shared/configs/emr-instance-fleet.json
    • --configurations pointing to shared/configs/hive-glue-config.json
    • --auto-termination-policy IdleTimeout=3600
    • Two --steps: CreateExternalTables, QueryTables
  4. Polls the cluster every 30 seconds via wait_for_step.sh
  5. Exits when the cluster terminates

What happens inside the cluster:

Step 1 — CreateExternalTables:

command-runner.jar → hive-script → hive -f 01_create_tables.hql
                                         -d DB=emr_labs_db
                                         -d RAW_BUCKET=emr-labs-raw-...

Hive connects to Glue (via the factory class set in hive-site.xml), creates the two external tables, and exits. The Glue catalog now has the definitions.

Step 2 — QueryTables: Runs aggregate queries (top states by cases, bed capacity by state) — output goes to the EMR step logs at s3://logs-bucket/emr-logs/{cluster-id}/steps/.

Typical runtime: 8–12 minutes (cluster bootstrap takes ~6 min, HQL ~2 min).

Verify:

# Check Glue tables were created
aws glue get-tables \
  --database-name emr_labs_db \
  --query 'TableList[].{Name:Name,Location:StorageDescriptor.Location}'

Best practice applied: HQL files are uploaded to S3 before cluster launch. Never rely on local paths inside EMR steps — the master node doesn’t have access to your laptop.


Lab 02 — Glue Data Catalog

What you’re doing: launch a cluster and run DESCRIBE FORMATTED to see exactly what the Glue catalog stores about your tables.

bash labs/lab-02-glue-catalog/scripts/run_lab.sh

The script first checks (without launching a cluster) that the Glue tables from Lab 01 exist. If they don’t, it fails fast with instructions.

What to look for in the step output log:

Table Type:             EXTERNAL_TABLE
Location:               s3://emr-labs-raw-.../covid/nytimes/
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

These are the raw metadata entries the Glue catalog stores. The Location is the mapping between a logical table name and its physical S3 path.

Verify in the console:

  1. Open https://console.aws.amazon.com/glue/home?region=us-east-1#catalog:tab=tables
  2. Select database emr_labs_db
  3. Click nytimes_counties → Schema tab shows all columns
  4. Click Data preview — Glue directly reads from S3 using the registered schema

Key insight: the Glue catalog is the single source of truth for table metadata. Hive, Athena, Spark, and Glue ETL all read from the same catalog. You create a table once and it’s available everywhere.


Lab 03 — Hive CTAS Transformations

What you’re doing: run a CTAS (Create Table As Select) that joins the two raw tables, computes stress metrics, and writes Parquet to the intermediate zone.

bash labs/lab-03-transformations/scripts/run_lab.sh

The transformation logic (01_ctas_enriched.hql):

CREATE TABLE covid_enriched
STORED AS PARQUET
LOCATION 's3://emr-labs-intermediate-.../covid_enriched/'
AS
SELECT
  n.date_str, n.state, n.county, n.fips,
  n.cases, n.deaths,
  h.num_staffed_beds, h.num_icu_beds,
  -- Hospital stress metric
  CASE WHEN h.num_staffed_beds > 0
       THEN ROUND(n.cases / h.num_staffed_beds, 4) END AS cases_per_staffed_bed
FROM nytimes_counties n
LEFT JOIN hospital_beds h ON n.fips = h.county_fips_code
WHERE n.fips IS NOT NULL;

Why LEFT JOIN instead of INNER JOIN: not every county in the NY Times data has a matching hospital facility in the Definitive Healthcare dataset (rural counties, data gaps). A LEFT JOIN preserves all case records even without a bed-capacity match — the stress metrics will be NULL for unmatched counties.

What Hive does physically:

  1. Reads nytimes_counties from S3 (CSV) → 6.2M rows
  2. Reads hospital_beds from S3 (CSV) → 7,500 rows
  3. Performs a distributed hash join (small table broadcast)
  4. Writes output as Parquet files (Snappy compressed) to the intermediate S3 prefix
  5. Registers the new table covid_enriched in the Glue catalog

Verify:

# Check Parquet files were written
INTERMEDIATE=$(terraform -chdir=terraform/envs/dev output -raw intermediate_bucket)
aws s3 ls "s3://$INTERMEDIATE/covid_enriched/" --recursive
# You should see several .parquet files

Best practice applied: ActionOnFailure: TERMINATE_CLUSTER on the CTAS step. If the join fails the cluster shuts down immediately, preventing billing for a zombie cluster waiting on a broken step.


Lab 04 — DROP TABLE: External vs Managed

What you’re doing: understand the most operationally important Hive concept — what actually happens to your data when you drop a table.

bash labs/lab-04-drop-tables/scripts/run_lab.sh

External table (what we created in Lab 01):

DROP TABLE nytimes_counties;
-- Glue catalog entry: DELETED
-- S3 data at s3://.../covid/nytimes/: PRESERVED

Managed table (created inside the lab):

CREATE TABLE nytimes_managed_sample AS SELECT ... LIMIT 1000;
-- Data written to: s3://emr-labs-raw-.../user/hive/warehouse/emr_labs_db.db/nytimes_managed_sample/

DROP TABLE nytimes_managed_sample;
-- Glue catalog entry: DELETED
-- S3 data: ALSO DELETED (Hive owns the data)

Why this matters in production: a junior engineer accidentally runs DROP TABLE orders on a managed production table and deletes years of order data from S3. This is a real incident that has happened at multiple companies. Best practice: always use EXTERNAL tables for data lake tables. Reserve managed tables only for truly ephemeral scratch data.

Verify after the lab:

# S3 data still exists even though the table was dropped and re-created
aws s3 ls "s3://$(terraform -chdir=terraform/envs/dev output -raw raw_bucket)/covid/nytimes/" --recursive | wc -l

Lab 05 — Athena Serverless Queries

What you’re doing: run SQL queries against the covid_enriched Glue table without launching any EMR cluster.

bash labs/lab-05-athena/scripts/run_lab.sh

The script uses aws athena start-query-execution to submit each SQL file and polls get-query-execution until the status is SUCCEEDED or FAILED.

Three queries run:

  1. 01_create_view.sql — creates view latest_county_snapshot (latest date per county)
  2. 02_top_states.sql — ranks states by cases_per_staffed_bed (hospital stress)
  3. 03_county_trend.sql — 7-day rolling growth for NY counties using LAG() window function

Athena vs Hive — when to use which:

AthenaHive on EMR
Best forAd-hoc SQL, dashboards, explorationLarge batch ETL, complex UDFs, Hive-specific features
Cost model$5/TB scanned (pay per query)$0.05–0.30/hr per cluster (pay for uptime)
Cold startInstant (serverless)6–10 min cluster bootstrap
Max query time30 minutesUnlimited
UDFsLimitedFull Java/Python UDFs
DDLFull CREATE/ALTER/DROPFull

Verify:

# Results are CSV files in the Athena bucket
ATHENA_BUCKET=$(terraform -chdir=terraform/envs/dev output -raw athena_bucket)
aws s3 ls "s3://$ATHENA_BUCKET/lab05-results/" | tail -10

# View query 2 results inline
QID=$(aws athena list-query-executions \
  --work-group primary \
  --query 'QueryExecutionIds[0]' --output text)
aws athena get-query-results --query-execution-id "$QID" \
  --query 'ResultSet.Rows[*].Data[*].VarCharValue' --output text | head -30

Best practice applied: Athena workgroup configured with OutputLocation pointing to a dedicated S3 bucket. Never send Athena results to a shared location — query results can contain sensitive data.


Lab 06 — Apache Oozie Orchestration

What you’re doing: schedule the entire pipeline (Labs 01 + 03) as a recurring Oozie coordinator that runs daily at 06:00 UTC.

This lab has two parts. You need KEY_PAIR set (see Prerequisites).

Part 1 — Launch the cluster (stays running, no auto-terminate):

export KEY_PAIR=emr-labs-key
bash labs/lab-06-oozie/scripts/run_lab.sh

This launches the cluster with Name=Oozie added to the applications list and waits for it to reach the WAITING state. The script prints the Master DNS.

Part 2 — Deploy and start the coordinator:

CLUSTER_ID=$(cat labs/lab-06-oozie/scripts/.last_cluster_id)
bash labs/lab-06-oozie/scripts/submit_oozie.sh

submit_oozie.sh does:

  1. scp the HQL scripts to the master and hadoop fs -put them to HDFS
  2. scp workflow.xml and coordinator.xml to HDFS
  3. Writes job.properties on the master with your bucket names and schedule
  4. Runs oozie job -run — returns a coordinator ID like 0000001-...-oozie-...C

Monitor the coordinator:

# SSH tunnel to access Oozie Web UI
ssh -L 11000:localhost:11000 -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
# Open: http://localhost:11000/oozie → Coordinators tab

# Or via CLI
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS> \
  "oozie job -oozie http://localhost:11000/oozie -info <COORDINATOR_ID>"

Oozie workflow structure (workflow.xml):

START
  │
  ▼
create-tables (Hive action)
  │ OK
  ▼
ctas-enriched (Hive action)
  │ OK
  ▼
validate-output (Hive action)
  │ OK
  ▼
END

Any action error → KILL (logs: wf:lastErrorNode + wf:errorMessage)

Terminate the cluster when done (Oozie needs a live cluster):

aws emr terminate-clusters \
  --cluster-ids $(cat labs/lab-06-oozie/scripts/.last_cluster_id) \
  --region us-east-1

Teardown

Remove all AWS resources (S3 buckets will be emptied then deleted because force_destroy = true):

cd terraform/envs/dev
terraform destroy

Type yes when prompted. This deletes everything Terraform created: buckets, IAM roles, Glue database. Any running EMR clusters are NOT managed by Terraform here — terminate them separately first.

# Terminate any still-running clusters before destroy
aws emr list-clusters --active --query 'Clusters[].Id' --output text \
  | xargs -r aws emr terminate-clusters --cluster-ids

6. Best Practices Applied

Cost

PracticeWhereSaving
Spot instances for core nodesemr-instance-fleet.json60–80% vs on-demand
Fallback to on-demand after 10 minSpotSpecification.TimeoutActionPrevents stuck provisioning
Transient clusters (Labs 01–05)--auto-termination-policyNo idle billing
Parquet + Snappy for intermediate zoneSTORED AS PARQUET in CTAS80–95% less Athena scan
S3-IA lifecycle after 30 daysterraform/modules/s3/main.tf~45% vs S3 Standard
gp3 EBS volumes (not gp2)emr-instance-fleet.json~20% cheaper per GB

Security (OWASP / AWS Well-Architected)

PracticeWhere
Block all public access on S3aws_s3_bucket_public_access_block for all buckets
IAM least privilege — scoped S3 accessOnly lab buckets + public covid lake
IAM least privilege — scoped Glue accessOnly Glue catalog operations, not admin
Source account condition on EMR service rolePrevents confused-deputy attacks
No hardcoded credentialsAll config via Terraform outputs + env vars

Reliability

PracticeWhere
TERMINATE_CLUSTER on critical stepsLab 03 CTAS — if the transform fails, stop billing
CONTINUE on exploratory stepsLabs 01, 02 — failure shouldn’t abort exploration
Idempotent HQL — CREATE IF NOT EXISTS, DROP IF EXISTSAll HQL files
Glue catalog as metastoreTables survive cluster termination
S3 as storageDecoupled from compute — data persists even if cluster dies

Observability

# EMR step logs (stdout, stderr, controller)
aws s3 ls s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/ --recursive

# View a step's stdout
aws s3 cp s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/s-XXXX/stdout - | head -50

# CloudWatch: EMR cluster metrics (CPU, HDFS, YARN)
# Console → CloudWatch → Metrics → EMR

7. What Production Companies Actually Do

Netflix

Netflix processes petabytes/day on a data platform built around Apache Iceberg (an open table format with ACID transactions on S3) + Apache Spark for transformation. They moved away from Hive because:

  • Hive has no ACID transactions (concurrent writes corrupt data)
  • Hive metastore becomes a bottleneck at scale (millions of partitions)
  • Iceberg provides time travel (query data as of any past timestamp) and schema evolution without rewriting files

Netflix open-sourced Metacat — a unified metadata service that sits in front of Hive Metastore, Glue, Druid, and Elasticsearch.

Uber

Uber’s data lake (Hudi, now Apache Hudi) solved the “update problem” in data lakes: how do you update a single row in a 10 TB Parquet file? Apache Hudi (Hadoop Upserts Deletes and Incrementals) stores data as log files that merge into base Parquet files on compaction — supporting record-level upserts and deletes (required for GDPR right-to-erasure).

Uber uses Presto/Trino (the open-source engine behind Athena) for ad-hoc SQL at scale, with a custom federated query layer.

Airbnb

Airbnb’s data team invented Apache Airflow (2015) — now the industry standard for data pipeline orchestration. Airflow DAGs are Python code, not XML (unlike Oozie), enabling version control, unit testing, and dynamic graph generation. AWS offers a managed version: Amazon MWAA (Managed Workflows for Apache Airflow).

Airbnb also built Apache Superset (open-source BI tool, now Apache top-level project) for querying Hive/Presto results.

AWS / Databricks

AWS Lake Formation (built on Glue) adds fine-grained access control to the Glue catalog: column-level security, row-level filtering, and data masking — without changing the underlying S3 data. This lets different teams query the same covid_enriched table but see different columns based on their IAM permissions.

Databricks built Delta Lake (open-source) — similar to Iceberg/Hudi but tightly integrated with Spark. It adds ACID, time travel, Z-order clustering, and OPTIMIZE (automatic compaction). Delta Lake is now converging with Iceberg via UniForm (writes both Delta and Iceberg metadata simultaneously).


8. Alternative Approaches

Instead of Hive on EMR → Apache Spark on EMR

Spark is 10–100× faster than Hive MapReduce for complex transformations because it processes data in memory across the cluster:

# PySpark equivalent of the Lab 03 CTAS
counties = spark.table("emr_labs_db.nytimes_counties")
beds      = spark.table("emr_labs_db.hospital_beds")

enriched = counties.join(
    beds, counties.fips == beds.county_fips_code, "left"
).withColumn(
    "cases_per_staffed_bed",
    F.when(F.col("num_staffed_beds") > 0,
           F.round(F.col("cases") / F.col("num_staffed_beds"), 4))
).write.format("parquet").mode("overwrite") \
       .option("path", f"s3://{intermediate}/covid_enriched/") \
       .saveAsTable("emr_labs_db.covid_enriched")

Use Spark when: complex window functions, ML feature engineering, streaming. Use Hive when: pure SQL teams, existing HQL codebase, MapReduce compatibility.

Instead of EMR on EC2 → EMR Serverless

EMR Serverless (launched 2022) eliminates cluster management entirely:

  • No instance selection, no spot fleet config, no bootstrap
  • Pay per vCPU-second and GB-second of memory used
  • Cold start: ~30 seconds vs 6–10 min for EMR on EC2
# Submit a Hive job to EMR Serverless
aws emr-serverless start-job-run \
  --application-id $APP_ID \
  --execution-role-arn $ROLE_ARN \
  --job-driver '{
    "hive": {
      "query": "s3://.../01_ctas_enriched.hql",
      "parameters": "-d DB=emr_labs_db"
    }
  }'

Trade-off: EMR Serverless has no persistent Oozie server, so Lab 06 as written requires EMR on EC2. For modern orchestration with EMR Serverless, use AWS Step Functions or MWAA (Airflow).

Instead of Oozie → Apache Airflow (MWAA)

Airflow DAGs are Python — versioned, tested, dynamic:

from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator

dag = DAG("covid_pipeline", schedule_interval="0 6 * * *")

create_tables = EmrAddStepsOperator(
    task_id="create_tables",
    job_flow_id="{{ var.value.emr_cluster_id }}",
    steps=[{
        "Name": "CreateTables",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["hive-script", "--run-hive-script", "--args",
                     "-f", "s3://.../01_create_tables.hql"]
        }
    }],
    dag=dag
)

Airflow is the industry default for 2024+. Oozie is stable and battle-tested but no longer actively developed. If you’re building a new pipeline, use Airflow or Step Functions.

Instead of plain Parquet → Apache Iceberg

Iceberg adds ACID, time travel, hidden partitioning, and schema evolution:

-- Iceberg DDL in Athena (or Hive on EMR)
CREATE TABLE emr_labs_db.covid_enriched_ice (
  date_str STRING, state STRING, county STRING,
  cases BIGINT, cases_per_staffed_bed DOUBLE
)
LOCATION 's3://emr-labs-intermediate-.../covid_enriched_ice/'
TBLPROPERTIES ('table_type'='ICEBERG');

-- Time travel: query data as of 7 days ago
SELECT * FROM covid_enriched_ice
FOR SYSTEM_TIME AS OF (current_timestamp - interval '7' day);

-- ACID update (not possible with plain Parquet/Hive)
UPDATE covid_enriched_ice SET cases = 99999
WHERE county = 'New York' AND date_str = '2021-01-01';

Instead of Athena → Amazon Redshift Spectrum

Redshift Spectrum reads from S3/Glue (like Athena) but is part of Redshift — better for complex multi-table joins, VACUUM, and ANALYZE on large datasets. Use Athena for ad-hoc; use Spectrum when you need Redshift’s query planner and are already on Redshift.

Instead of Glue catalog → AWS Lake Formation

Lake Formation wraps the Glue catalog with:

  • Column-level access control (e.g., hide deaths column from read-only users)
  • Row-level filtering (e.g., WHERE state = 'California' enforced at catalog level)
  • Governed tables (ACID transactions, automatic compaction)
  • Cross-account data sharing without copying data

Adoption path: start with Glue catalog (as in these labs) → layer Lake Formation on top when you need multi-team governance.


9. Cost Reference

Per-lab cost (us-east-1, 2026 pricing)

LabCluster?DurationComputeStorageAthenaTotal
00No2 min$0.00< $0.01~$0.01
01Transient12 min~$0.06< $0.01~$0.07
02Transient10 min~$0.05~$0.05
03Transient15 min~$0.07< $0.01~$0.08
04Transient12 min~$0.06~$0.06
05No3 min$0.00< $0.01< $0.005~$0.01
06Persistent30 min*~$0.15~$0.15

*Terminate immediately after the coordinator run completes.

Total for all 7 labs: ~$0.43 – $0.60

Cost drivers to watch

# List any running EMR clusters (check after each lab)
aws emr list-clusters --active --region us-east-1 \
  --query 'Clusters[].{Id:Id,Name:Name,State:Status.State}'

# S3 storage cost: check total size
aws s3 ls s3://$RAW_BUCKET --recursive --summarize | tail -2

Never leave an EMR cluster running overnight. Each m5.xlarge on-demand master costs $4.61/day. Set a CloudWatch billing alarm:

aws cloudwatch put-metric-alarm \
  --alarm-name emr-labs-spend-alert \
  --metric-name EstimatedCharges \
  --namespace AWS/Billing \
  --statistic Maximum \
  --period 86400 \
  --threshold 5.00 \
  --comparison-operator GreaterThanThreshold \
  --alarm-actions arn:aws:sns:us-east-1:ACCOUNT:your-topic

10. Observability: Results, Logs, UIs & DAGs

Every lab produces artifacts in several places simultaneously. This section explains every tool available to you — what it shows, how to open it, and how to interpret what you see. Use this as a reference while running the labs.


Tool Map: What to Use When

SituationBest tool
Did my EMR step succeed or fail?AWS CLI describe-cluster / list-steps
What exactly went wrong in a step?S3 step logs (stderr)
Why is my Hive query slow?Tez UI → DAG detail
Which tasks are bottlenecking the job?Tez UI → Task Attempts tab
What is the cluster doing right now?YARN ResourceManager UI
How much HDFS space is used?HDFS NameNode UI
Interactive Hive / HDFS browser in a UIHue
Is my Oozie workflow running?Oozie Web UI / Oozie CLI
Did the Oozie coordinator trigger today?Oozie Web UI → Coordinators
What query ran in Athena last hour?Athena query history (console / CLI)
Do my Glue tables have the right schema?Glue console → Tables
Is the cluster out of memory / CPU?CloudWatch EMR metrics
Row-count / data-quality assertionsHive CLI or Athena SQL

EMR Step Logs (S3)

This is the first place to look after any failure. Every EMR step writes four log streams to S3 under the logs bucket:

s3://{logs-bucket}/emr-logs/{cluster-id}/steps/{step-id}/
    stdout          ← Hive query output, PRINT statements
    stderr          ← errors, stack traces, Tez task progress
    controller      ← EMR step runner lifecycle events
    syslog          ← JVM / OS-level messages

Find the step ID and stream logs (fastest path after a failure):

# Load bucket name
source shared/scripts/get_tf_outputs.sh

# List clusters and find your cluster ID
CLUSTER_ID=$(aws emr list-clusters --active --region us-east-1 \
  --query 'Clusters[0].Id' --output text)
# Or use the last saved cluster ID
CLUSTER_ID=$(cat labs/lab-01-hive-basics/scripts/.last_cluster_id 2>/dev/null)

# List steps and their states
aws emr list-steps --cluster-id "$CLUSTER_ID" \
  --query 'Steps[].{Id:Id,Name:Name,State:Status.State,Reason:Status.FailureDetails.Reason}'

# Get the step ID of the failed step
STEP_ID=$(aws emr list-steps --cluster-id "$CLUSTER_ID" \
  --query 'Steps[?Status.State==`FAILED`].Id | [0]' --output text)

# Stream stderr (where errors and stack traces go)
aws s3 cp \
  "s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/$STEP_ID/stderr.gz" - \
  | gunzip | tail -100

# Stream stdout (Hive query output, SELECT results)
aws s3 cp \
  "s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/$STEP_ID/stdout.gz" - \
  | gunzip | head -200

Reading stderr for a Hive error: look for lines starting with FAILED: or Error near the bottom of the file:

FAILED: SemanticException [Error 10002]: Line 3:... Invalid column reference ...

The line number in the error refers to the HQL file, not the log file.

Logs are not immediately available — EMR flushes logs to S3 every 5 minutes. If the step just finished, wait a moment before fetching. For real-time logs, SSH to the master and read YARN container logs directly (see the YARN section below).


Tez UI — DAG Visualiser

The Tez UI shows the execution graph of every Hive query that ran via Tez. It is the single best tool for understanding query performance.

Open the Tez UI (requires a running cluster):

# SSH tunnel — maps localhost:8080 → master:8080
ssh -L 8080:localhost:8080 \
    -i ~/.ssh/emr-labs-key.pem \
    hadoop@<MASTER_DNS> \
    -N -f   # -N: no command, -f: background

# Open in browser
open http://localhost:8080

The Master DNS is printed by every run_lab.sh script and also available via:

aws emr describe-cluster --cluster-id "$CLUSTER_ID" \
  --query 'Cluster.MasterPublicDnsName' --output text

Navigating the Tez UI:

Tez UI home → All DAGs tab
  │
  └─► Select a DAG (named after the HQL file or query text)
        │
        ├─► DAG Details
        │     ├── Status: SUCCEEDED / FAILED / RUNNING
        │     ├── Start/End time, Duration
        │     ├── Vertices (each = one Map or Reduce stage)
        │     └── Counters (rows read/written, bytes, GC time)
        │
        ├─► Graphical View  ← visual DAG with data flow arrows
        │     Each box = a vertex. Hover for row/byte counts.
        │     Thick arrows = large data shuffles (potential bottleneck)
        │
        ├─► Vertex Details  (click a vertex in the graph)
        │     ├── Tasks: list of all parallel tasks for this vertex
        │     ├── Task Attempts: shows retried/failed attempts
        │     └── Counters per task (find the slow outlier task here)
        │
        └─► Timeline View
              Gantt chart — shows which vertices ran in parallel vs serial
              and where wall-clock time was actually spent

What to look for:

  • Skew: one task takes 10× longer than others → data not evenly partitioned
  • Spill to disk: SPILLED_RECORDS counter > 0 → insufficient memory, increase tez.am.resource.memory.mb or add nodes
  • Slow vertex: look at the Timeline view — the longest bar is your bottleneck. Click it to see whether it’s Map I/O, shuffle, or Reduce
  • Failed task attempts: yellow/red tasks in Vertex Details → expand to see the attempt error message

Force a query to show in Tez UI (run interactively on master):

ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
hive
SET hive.execution.engine=tez;
SET tez.queue.name=default;

-- This query will appear in the Tez UI under "All DAGs"
SELECT state, COUNT(*) AS county_count, SUM(cases) AS total_cases
FROM emr_labs_db.nytimes_counties
GROUP BY state
ORDER BY total_cases DESC
LIMIT 15;

YARN ResourceManager UI

YARN manages cluster resources (CPU cores and memory) across all running jobs. The ResourceManager UI shows live application status.

# SSH tunnel
ssh -L 8088:localhost:8088 \
    -i ~/.ssh/emr-labs-key.pem \
    hadoop@<MASTER_DNS> \
    -N -f
open http://localhost:8088

Key sections:

TabWhat it shows
Cluster → ApplicationsAll running, finished, and failed YARN apps (each Hive/Tez job = one app)
Cluster → NodesEach core node’s available vCores and memory
Application detailAttempt history, AM logs, tracking URL (Tez UI link)
SchedulerQueue utilisation (capacity, used, pending)

CLI equivalent (useful when no tunnel is open):

# List running applications
yarn application -list 2>/dev/null  # run on master via SSH

# Or from your laptop via the EMR CLI
aws emr ssh --cluster-id "$CLUSTER_ID" --key-pair-file ~/.ssh/emr-labs-key.pem \
  --command "yarn application -list"

# Application logs for a specific app (replace application_XXX)
aws emr ssh --cluster-id "$CLUSTER_ID" --key-pair-file ~/.ssh/emr-labs-key.pem \
  --command "yarn logs -applicationId application_1234567890_0001 | tail -200"

YARN container logs — most granular output, available in real-time during a running job (unlike S3 logs which are flushed every 5 min):

# SSH to master
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>

# Find the application ID from YARN
yarn application -list -appStates ALL | grep -i hive

# Stream all container logs (large — pipe through grep or tail)
yarn logs -applicationId application_1234567890_0001 2>&1 | grep -i error | head -50

HDFS NameNode UI

HDFS NameNode tracks the distributed filesystem used by Hive for temp data, Oozie workflows, and Hive warehouse (managed tables).

ssh -L 50070:localhost:50070 \
    -i ~/.ssh/emr-labs-key.pem \
    hadoop@<MASTER_DNS> \
    -N -f
open http://localhost:50070

What to check here:

  • Overview: total capacity vs used — if > 85% full, Tez spill writes will fail with No space left on device
  • Utilities → Browse the file system: navigate to /user/oozie/ or /user/hive/warehouse/ to verify that Oozie workflow files were uploaded and that any managed Hive tables exist as expected

CLI equivalent:

# Check HDFS usage
hadoop fs -df -h /

# List Oozie workflow files
hadoop fs -ls /user/oozie/emr-labs/

# Verify HQL files are on HDFS (needed by Oozie)
hadoop fs -ls /user/oozie/emr-labs/hql/

Hue — Browser IDE

Hue (Hadoop User Experience) is a web application that provides a browser-based interface for Hive, HDFS, Oozie, and more. It is available on EMR but must be added explicitly at cluster launch.

Enable Hue — add it to the --applications list in run_lab.sh:

--applications Name=Hive Name=Hadoop Name=Hue Name=Tez Name=Oozie

Access Hue (port 8888):

ssh -L 8888:localhost:8888 \
    -i ~/.ssh/emr-labs-key.pem \
    hadoop@<MASTER_DNS> \
    -N -f
open http://localhost:8888
# Default credentials: admin / admin (change on first login)

What Hue gives you:

FeaturePath in HueWhat you can do
Hive EditorEditor → HiveWrite and run HQL interactively, see results in a table
Query historyEditor → My queriesRe-open past queries, see execution time
HDFS BrowserFilesBrowse, upload, download, preview files on HDFS and S3
Oozie Job BrowserJobs → WorkflowsSee workflow run status, action-by-action
Oozie CoordinatorJobs → CoordinatorsSee scheduled runs, next trigger, pause/kill
Table BrowserTablesBrowse Glue/Hive databases, preview sample rows
Metastore ManagerTables → DatabaseView column schemas, partitions, statistics

Important: Hue is a persistent-cluster feature. Transient clusters (Labs 01–05) terminate before you could use Hue. It is most useful with the Lab 06 persistent cluster.


Oozie Web UI

Oozie has its own web UI that shows workflow and coordinator status in detail.

# SSH tunnel to Oozie port
ssh -L 11000:localhost:11000 \
    -i ~/.ssh/emr-labs-key.pem \
    hadoop@<MASTER_DNS> \
    -N -f
open http://localhost:11000/oozie

Navigating the Oozie UI:

Oozie Web Console
  ├── Workflow Jobs tab
  │     Shows each individual workflow run:
  │     Status: RUNNING / SUCCEEDED / KILLED / FAILED
  │     Click a job → Actions tab → see each action (create-tables,
  │     ctas-enriched, validate-output) with OK / ERROR state
  │     Click an action → External ID → links to YARN app for that Hive job
  │
  ├── Coordinator Jobs tab
  │     Shows the coordinator (daily schedule)
  │     Status: RUNNING / PAUSED / KILLED
  │     Next Materialization: when the next run will trigger
  │     Click → Jobs tab → list of each daily run instance
  │
  └── System Info tab
        Oozie server version, build time, configuration values

Oozie CLI (more reliable than the UI for scripting):

# SSH to master first
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>

OOZIE=http://localhost:11000/oozie

# List running coordinators
oozie jobs -oozie $OOZIE -jobtype coordinator -filter status=RUNNING

# Get coordinator status (replace 0000001-...-oozie-...C)
oozie job -oozie $OOZIE -info 0000001-250101000000000-oozie-oozi-C

# List the individual workflow runs spawned by the coordinator
oozie job -oozie $OOZIE -info 0000001-250101000000000-oozie-oozi-C -len 5

# Get details of a specific workflow run
oozie job -oozie $OOZIE -info 0000002-250101000000000-oozie-oozi-W

# View full log of a failed workflow
oozie job -oozie $OOZIE -log 0000002-250101000000000-oozie-oozi-W

# Rerun a failed workflow from the failed action (not from the start)
oozie job -oozie $OOZIE -rerun 0000002-250101000000000-oozie-oozi-W \
  -action ctas-enriched

# Kill a running coordinator
oozie job -oozie $OOZIE -kill 0000001-250101000000000-oozie-oozi-C

# Suspend (pause) a coordinator without killing it
oozie job -oozie $OOZIE -suspend 0000001-250101000000000-oozie-oozi-C

# Resume a suspended coordinator
oozie job -oozie $OOZIE -resume 0000001-250101000000000-oozie-oozi-C

Understanding workflow action states:

StateMeaning
OKAction completed successfully
ERRORAction failed — workflow moved to the KILL action
RUNNINGAction is currently executing (Hive job in progress)
PREPAction is waiting for the previous action to finish
KILLEDWorkflow was manually killed before this action ran
ENDWorkflow reached the END node (success)

Hive CLI — Interactive Verification

For direct interactive queries against the cluster while it is running, SSH to the master and launch Hive or Beeline.

ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>

Option A: Hive CLI (simpler, but deprecated in favour of Beeline):

hive
-- Confirm execution engine
SET hive.execution.engine;
-- hive.execution.engine=tez

-- Switch to the lab database
USE emr_labs_db;

-- List tables registered in Glue
SHOW TABLES;

-- Verify row counts after Lab 01
SELECT COUNT(*) FROM nytimes_counties;   -- expect ~1.8M rows
SELECT COUNT(*) FROM hospital_beds;       -- expect ~7,500 rows

-- Spot-check a few raw rows
SELECT * FROM nytimes_counties LIMIT 5;

-- Verify enriched table after Lab 03
SELECT COUNT(*) FROM covid_enriched;      -- expect ~1.5M (rows with non-null FIPS)
SELECT * FROM covid_enriched LIMIT 3;

-- Check partition/file info
DESCRIBE FORMATTED covid_enriched;
-- Look for: Location (should be s3://intermediate-bucket/...)
-- Look for: InputFormat  (should be ParquetInputFormat)

-- Validate no nulls in key columns
SELECT
  COUNT(*)                                   AS total_rows,
  COUNT(CASE WHEN fips IS NULL THEN 1 END)   AS null_fips,
  COUNT(CASE WHEN cases < 0 THEN 1 END)      AS negative_cases,
  MIN(date_str), MAX(date_str)
FROM covid_enriched;

-- Check join quality: what % of rows matched a hospital record?
SELECT
  COUNT(*) AS total,
  SUM(CASE WHEN num_staffed_beds IS NOT NULL THEN 1 ELSE 0 END) AS matched,
  ROUND(100.0 * SUM(CASE WHEN num_staffed_beds IS NOT NULL THEN 1 ELSE 0 END)
        / COUNT(*), 2) AS match_pct
FROM covid_enriched;
-- Expect: ~65–75% match rate (rural counties have no hospital record)

Option B: Beeline (JDBC client, connects to HiveServer2, recommended):

beeline -u jdbc:hive2://localhost:10000 -n hadoop

Beeline shows query progress (% complete) in the terminal and is the production-grade Hive client used by most BI tools and Hue.

-- Beeline has the same SQL dialect as the Hive CLI
-- Progress bar appears during execution:
-- INFO  : DAG Status: Status=RUNNING, Progress=50.00%
SELECT state, SUM(cases) AS total FROM emr_labs_db.nytimes_counties
GROUP BY state ORDER BY total DESC LIMIT 5;

Exit: type quit; or press Ctrl+D.


Athena Query History & Results

Athena stores every query execution (success and failure) for 45 days.

AWS Console:

  1. Open https://console.aws.amazon.com/athena/home?region=us-east-1
  2. Click Query editorHistory tab
  3. Each row: query text (truncated), state, run time, data scanned, timestamp
  4. Click a row → View details → full query text, execution ID, output S3 location
  5. Click the S3 output location to download the CSV result file

AWS CLI:

# List recent query executions
aws athena list-query-executions \
  --work-group primary \
  --query 'QueryExecutionIds[:10]' --output text

# Get status and stats for a specific execution
QID=<paste-execution-id-here>
aws athena get-query-execution --query-execution-id "$QID" \
  --query 'QueryExecution.{State:Status.State,
           Reason:Status.StateChangeReason,
           Scanned:Statistics.DataScannedInBytes,
           Runtime:Statistics.TotalExecutionTimeInMillis,
           Output:ResultConfiguration.OutputLocation}'

# Stream the result CSV directly
OUTPUT=$(aws athena get-query-execution --query-execution-id "$QID" \
  --query 'QueryExecution.ResultConfiguration.OutputLocation' --output text)
aws s3 cp "$OUTPUT" -

# Paginate results via the API (no S3 download)
aws athena get-query-results \
  --query-execution-id "$QID" \
  --query 'ResultSet.Rows[*].Data[*].VarCharValue'

Key Athena stats to check:

  • DataScannedInBytes: if this equals the full file size, Parquet predicate push-down is NOT working — check that the WHERE clause uses a partitioned column or that statistics exist
  • TotalExecutionTimeInMillis: wall-clock time including queue wait; subtract QueryQueueTimeInMillis to get pure execution time
  • StateChangeReason: when State=FAILED, this contains the SQL error message

Athena workgroup settings — check the result output location:

aws athena get-work-group --work-group primary \
  --query 'WorkGroup.Configuration.ResultConfiguration.OutputLocation'

Output should be s3://emr-labs-athena-{account}-us-east-1/.


Glue Console — Schema & Data Preview

The Glue console is the canonical view of what table definitions exist in your metastore. Use it to validate that Lab 01 and Lab 03 wrote schemas correctly.

In the AWS Console:

  1. Go to https://console.aws.amazon.com/glue/home?region=us-east-1
  2. Data Catalog → Databases → click emr_labs_db
  3. Click any table name → Schema tab → verify column names, types, SerDe
  4. Properties tabLocation (S3 path), InputFormat, Classification
  5. Click Action → View data → opens Athena pre-loaded with a SELECT * LIMIT 10

AWS CLI:

# List all tables in the lab database
aws glue get-tables --database-name emr_labs_db \
  --query 'TableList[].{Name:Name,
           Location:StorageDescriptor.Location,
           Format:StorageDescriptor.InputFormat,
           Cols:StorageDescriptor.Columns[*].Name}'

# Inspect a single table's full definition
aws glue get-table \
  --database-name emr_labs_db \
  --name covid_enriched \
  | jq '.Table.StorageDescriptor | {Location, InputFormat, SerdeInfo}'

# Check table statistics (populated after ANALYZE TABLE or a Glue Crawler run)
aws glue get-column-statistics-for-table \
  --database-name emr_labs_db \
  --table-name covid_enriched \
  --column-names cases deaths fips 2>/dev/null

What correct output looks like for covid_enriched after Lab 03:

{
  "Location": "s3://emr-labs-intermediate-.../covid_enriched/",
  "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
  "SerdeInfo": {
    "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
  }
}

If InputFormat still shows TextInputFormat, the CTAS did not write Parquet — check that the CTAS step completed without errors.


CloudWatch — Cluster Metrics & Alarms

EMR automatically publishes ~50 metrics to CloudWatch under the AWS/EMR namespace. No agent or configuration is required.

Useful EMR metrics:

MetricWhat it measuresAlert threshold
YARNMemoryAvailablePercentage% of cluster memory not yet allocated< 15% → add nodes
ContainerAllocatedRunning YARN containersSpike = new job started
HDFSUtilization% of HDFS disk used> 85% → risk of spill failures
MRActiveNodesActive core/task nodesDrop = node failure
S3BytesReadBytes read from S3 per secondUseful for estimating job I/O
S3BytesWrittenBytes written to S3 per secondSpike = CTAS or output write

View metrics in the console:

  1. Go to https://console.aws.amazon.com/cloudwatch/home?region=us-east-1
  2. Metrics → All metrics → AWS/EMR
  3. Select your cluster by JobFlowId
  4. Add to a graph: YARNMemoryAvailablePercentage + ContainerAllocated

CLI — fetch a metric for the last 30 minutes:

CLUSTER_ID=$(cat labs/lab-03-transformations/scripts/.last_cluster_id)

aws cloudwatch get-metric-statistics \
  --namespace AWS/EMR \
  --metric-name YARNMemoryAvailablePercentage \
  --dimensions Name=JobFlowId,Value="$CLUSTER_ID" \
  --start-time $(date -u -v-30M +%FT%TZ) \
  --end-time   $(date -u +%FT%TZ) \
  --period 60 \
  --statistics Average \
  --query 'sort_by(Datapoints, &Timestamp)[*].{T:Timestamp,Avg:Average}' \
  --output table

Create an alarm — alert when memory is critically low:

aws cloudwatch put-metric-alarm \
  --alarm-name emr-low-memory-$CLUSTER_ID \
  --namespace AWS/EMR \
  --metric-name YARNMemoryAvailablePercentage \
  --dimensions Name=JobFlowId,Value="$CLUSTER_ID" \
  --statistic Average \
  --period 120 \
  --threshold 10 \
  --comparison-operator LessThanThreshold \
  --evaluation-periods 2 \
  --alarm-description "EMR cluster low on YARN memory" \
  --treat-missing-data notBreaching

Reading & Validating Results

After each lab, run these checks to confirm the pipeline produced correct output.

Lab 01 — External tables created

# Tables must exist in Glue
aws glue get-tables --database-name emr_labs_db \
  --query 'TableList[].Name' --output text
# Expected: hospital_beds   nytimes_counties

# S3 data must be untouched (external table DROP test)
aws s3 ls "s3://$RAW_BUCKET/covid/nytimes/us-counties/csv/" | wc -l
# Expected: > 0 files

Lab 03 — Parquet enriched table

# Parquet files written to intermediate bucket
aws s3 ls "s3://$INTERMEDIATE_BUCKET/covid_enriched/" --recursive | wc -l
# Expected: several .parquet files (exact count varies by parallelism)

# Row count via Athena (fast, no cluster needed)
aws athena start-query-execution \
  --query-string "SELECT COUNT(*) FROM emr_labs_db.covid_enriched" \
  --work-group primary \
  --result-configuration "OutputLocation=s3://$ATHENA_BUCKET/validation/" \
  --query 'QueryExecutionId' --output text
# Then fetch result as shown in the Athena section above
# Expected: ~1,500,000 – 1,800,000 rows depending on dataset version

# Confirm format is Parquet via Glue
aws glue get-table --database-name emr_labs_db --name covid_enriched \
  --query 'Table.StorageDescriptor.InputFormat' --output text
# Expected: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

Lab 05 — Athena view and window function results

# Verify the view exists
aws glue get-table --database-name emr_labs_db \
  --name latest_county_snapshot 2>&1 | grep -c TableName
# Expected: 1

# Check the trend query produced output (non-empty result file)
aws s3 ls "s3://$ATHENA_BUCKET/lab05-results/" \
  --recursive --human-readable | grep -v metadata
# At least 3 output files (one per SQL statement)

Lab 06 — Oozie coordinator triggered

ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS> \
  "oozie jobs -oozie http://localhost:11000/oozie \
   -jobtype coordinator -filter status=RUNNING"
# Expected: one coordinator row with your application path

# Confirm at least one workflow run reached END state
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS> \
  "oozie job -oozie http://localhost:11000/oozie \
   -info 0000001-...-oozie-oozi-C -len 3"
# Look for Status=SUCCEEDED in the Actions column

General data quality pattern

-- Run in Hive CLI or Athena after any transformation
SELECT
  'total_rows'           AS check_name, CAST(COUNT(*) AS STRING) AS result
  FROM emr_labs_db.covid_enriched
UNION ALL
SELECT 'null_fips',    CAST(SUM(CASE WHEN fips IS NULL THEN 1 ELSE 0 END) AS STRING)
  FROM emr_labs_db.covid_enriched
UNION ALL
SELECT 'negative_cases', CAST(SUM(CASE WHEN cases < 0  THEN 1 ELSE 0 END) AS STRING)
  FROM emr_labs_db.covid_enriched
UNION ALL
SELECT 'date_range',
  CONCAT(MIN(date_str), ' → ', MAX(date_str))
  FROM emr_labs_db.covid_enriched;

Quick CLI Reference Card

Keep these commands handy while working through the labs.

# ── Cluster status ─────────────────────────────────────────────────────────
aws emr describe-cluster --cluster-id $CID \
  --query 'Cluster.{State:Status.State,Master:MasterPublicDnsName}'

aws emr list-steps --cluster-id $CID \
  --query 'Steps[].{Name:Name,State:Status.State,Id:Id}'

# ── Step logs ──────────────────────────────────────────────────────────────
aws s3 cp s3://$LOGS_BUCKET/emr-logs/$CID/steps/$STEP/stderr.gz - | gunzip | tail -80
aws s3 cp s3://$LOGS_BUCKET/emr-logs/$CID/steps/$STEP/stdout.gz - | gunzip | head -100

# ── SSH tunnels (run each in a separate terminal, Ctrl+C to close) ─────────
ssh -L 8080:localhost:8080  -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N  # Tez UI
ssh -L 8088:localhost:8088  -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N  # YARN
ssh -L 50070:localhost:50070 -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N  # HDFS
ssh -L 8888:localhost:8888  -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N  # Hue
ssh -L 11000:localhost:11000 -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N  # Oozie

# ── Glue ───────────────────────────────────────────────────────────────────
aws glue get-tables --database-name emr_labs_db --query 'TableList[].Name'
aws glue get-table  --database-name emr_labs_db --name <TABLE> | jq '.Table.StorageDescriptor'

# ── Athena ─────────────────────────────────────────────────────────────────
aws athena list-query-executions --work-group primary --query 'QueryExecutionIds[:5]'
aws athena get-query-execution   --query-execution-id $QID \
  --query 'QueryExecution.{State:Status.State,Scanned:Statistics.DataScannedInBytes}'

# ── S3 data checks ─────────────────────────────────────────────────────────
aws s3 ls s3://$RAW_BUCKET/covid/          --recursive --human-readable | tail -5
aws s3 ls s3://$INTERMEDIATE_BUCKET/       --recursive --human-readable | tail -5
aws s3 ls s3://$LOGS_BUCKET/emr-logs/$CID/ --recursive | wc -l

# ── YARN (run on master via SSH) ───────────────────────────────────────────
yarn application -list -appStates ALL
yarn logs -applicationId application_XXXXXXXX_XXXX 2>&1 | grep -i 'error\|fatal' | head -30

# ── Oozie (run on master via SSH) ──────────────────────────────────────────
OOZIE=http://localhost:11000/oozie
oozie jobs   -oozie $OOZIE -jobtype coordinator -filter status=RUNNING
oozie job    -oozie $OOZIE -info  <COORD_ID>
oozie job    -oozie $OOZIE -log   <WF_ID>
oozie job    -oozie $OOZIE -rerun <WF_ID> -action <ACTION_NAME>

11. Production Monitoring & Troubleshooting: Lessons from Large Companies

The following section draws on published engineering blog posts, open-source tools, and conference talks (Spark Summit, QCon, re:Invent, VLDB) from companies that operate data platforms at the scale of billions of events per day. Each subsection identifies the core problem they faced, the technical solution they built or adopted, and the concrete practice you can apply in your own pipelines (including these labs).


Netflix — Instrument Everything, Trust Nothing

Background

Netflix processes ~700 billion events per day across its data platform (2024 figures from their tech blog). At that scale, a silent data-quality bug — a column that starts emitting NULL instead of a user ID — can corrupt weeks of A/B test results before anyone notices.

Problem 1: Silent failures in distributed jobs

A Spark/Hive job exits with code 0 (success) but writes 0 rows because a filter predicate silently excluded everything. The job succeeded; the data did not.

Netflix’s solution — output record count assertions: Every job emits a metric pipeline.output_records{job=X, date=Y}. A separate anomaly-detection job compares today’s count to a rolling 14-day median. If today’s count deviates by more than 3σ, PagerDuty fires.

Apply this now:

# After Lab 03 CTAS, assert minimum row count before the cluster terminates
# Add this as a final step in run_lab.sh:
ROW_COUNT=$(aws athena start-query-execution \
  --query-string "SELECT COUNT(*) FROM emr_labs_db.covid_enriched" \
  --work-group primary \
  --result-configuration "OutputLocation=s3://$ATHENA_BUCKET/qa/" \
  --output text --query QueryExecutionId)
# ... wait for completion ...
COUNT=$(aws athena get-query-results --query-execution-id $ROW_COUNT \
  --query 'ResultSet.Rows[1].Data[0].VarCharValue' --output text)
if [ "$COUNT" -lt 1000000 ]; then
  echo "ASSERTION FAILED: expected >= 1M rows, got $COUNT" >&2
  exit 1
fi

Problem 2: Metastore as a single point of failure

The original Netflix pipeline used a shared Hive Metastore (MySQL-backed). A schema migration on the metastore database caused 6 hours of pipeline downtime across 300+ Spark jobs that all connected to the same instance.

Netflix’s solution — Metacat: Metacat is a federated metadata service that presents a single API over multiple underlying metastores (Hive, Glue, Druid, Elasticsearch). It adds:

  • Partition pruning cache: avoids hammering the metastore with SHOW PARTITIONS for every job start
  • Metadata tagging: annotate tables with owner, sla, sensitivity (PII/non-PII) as searchable tags
  • Change events: every table schema change emits a Kafka event so downstream consumers can react

Lesson for AWS: use Glue (as these labs do) rather than a self-managed Hive Metastore. Glue is multi-AZ, serverless, and does not become a capacity bottleneck. Add Glue tags for ownership:

aws glue tag-resource \
  --resource-arn arn:aws:glue:us-east-1:ACCOUNT:table/emr_labs_db/covid_enriched \
  --tags-to-add owner=data-eng,sla=daily-06:00,sensitivity=public

Problem 3: Slow query regression

A schema change (adding a new column) caused a previously 2-minute query to take 45 minutes because the new column triggered a different join strategy.

Netflix’s solution — query fingerprinting + performance baselining: Every query is hashed to a fingerprint (normalized, stripping literal values). Historical execution time per fingerprint is stored in an internal time-series DB. An SLA checker flags any fingerprint whose p95 runtime exceeds 2× its 30-day median.

Apply this with Athena:

# Log Athena query stats to a metrics file after every run
STATS=$(aws athena get-query-execution --query-execution-id $QID \
  --query 'QueryExecution.Statistics')
echo "$(date -u +%FT%TZ) qid=$QID \
  runtime=$(echo $STATS | jq .TotalExecutionTimeInMillis) \
  scanned=$(echo $STATS | jq .DataScannedInBytes)" \
  >> .athena_query_stats.log

# Alert if runtime exceeds 3× historical average for the same query
# (implement with a simple awk/python check on the log file)

Uber — Data Quality as a First-Class Citizen

Background

Uber’s data platform (built around Hudi + Presto + Spark) processes ride, earnings, and fraud signals. In 2019 they published their experience with silent data corruption — jobs that ran cleanly but produced wrong results due to upstream schema drift.

Problem 1: Schema drift at the source

A backend team renamed a JSON field from driver_id to driverId (camelCase migration). All downstream Hive jobs silently started getting NULL for driver_id because the schema was never updated. The bug was discovered 11 days later by a product analyst noticing anomalous metrics.

Uber’s solution — Schemaless + schema registry enforcement:

  • All events must be registered in a schema registry before any consumer can read them
  • Schema evolution rules are enforced at the producer: only backward- compatible changes allowed (add optional fields, never rename or remove)
  • For Hive/Glue: use TBLPROPERTIES ('schema.evolution'='true') with Hudi or Iceberg and enable the schema compatibility checker

Apply this with Glue schema registry:

# Create a schema registry for event schemas
aws glue create-registry --registry-name emr-labs-registry

# Register a schema version — rejects incompatible changes
aws glue create-schema \
  --registry-id RegistryName=emr-labs-registry \
  --schema-name county-cases \
  --data-format AVRO \
  --compatibility BACKWARD \
  --schema-definition file://schemas/county_cases.avsc

Problem 2: Duplicates from exactly-once failures

When an ingestion job failed mid-run and was retried, some partitions were written twice. Hive INSERT OVERWRITE is idempotent; INSERT INTO is not. Downstream COUNT queries returned inflated figures.

Uber’s solution — Hudi upsert semantics: Hudi (Hadoop Upserts Deletes and Incrementals) uses a record key + partition path combination to guarantee exactly-once semantics even on retry. It maintains a .hoodie/ commit timeline log. If a job is retried, Hudi detects that the commit already exists and skips the write.

Apply this for idempotent Hive jobs (without Hudi):

-- Use INSERT OVERWRITE (not INSERT INTO) for idempotency
INSERT OVERWRITE TABLE emr_labs_db.covid_enriched
PARTITION (state)
SELECT ...
FROM source_table
WHERE date_str = '${DATE}';
-- Retrying this is safe: the partition is overwritten, not appended

Problem 3: Data freshness SLA violations

A daily pipeline was supposed to complete by 08:00 UTC. Intermittent Spot instance reclamation caused it to finish at 11:30 UTC with no alert fired because the job did eventually succeed.

Uber’s solution — freshness SLA metrics:

pipeline.completion_delay_minutes{job=X, date=Y}

If completion_delay_minutes > 120, PagerDuty fires even if the job ultimately succeeded.

Apply this with CloudWatch:

# Emit a custom metric for pipeline completion time
START_EPOCH=<record job start time>
END_EPOCH=$(date +%s)
DELAY_MIN=$(( (END_EPOCH - START_EPOCH) / 60 ))

aws cloudwatch put-metric-data \
  --namespace EMRLabs/Pipelines \
  --metric-name CompletionDelayMinutes \
  --dimensions JobName=lab03-ctas,Date=$(date +%F) \
  --value "$DELAY_MIN" \
  --unit Count

# Alarm: alert if completion takes > 45 minutes
aws cloudwatch put-metric-alarm \
  --alarm-name lab03-sla-breach \
  --namespace EMRLabs/Pipelines \
  --metric-name CompletionDelayMinutes \
  --dimensions Name=JobName,Value=lab03-ctas \
  --statistic Maximum \
  --period 300 \
  --threshold 45 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 1

LinkedIn — The Origin of Metadata-Driven Monitoring

Background

LinkedIn is where many foundational big data tools were born: Kafka, Samza, Azkaban (workflow scheduler), and Apache Atlas (data lineage). Their data team wrote the 2012 paper “Building LinkedIn’s Real-time Activity Data Pipeline” which influenced how the industry thinks about data observability.

Problem 1: You can’t monitor what you can’t trace

When a dashboard showed wrong numbers, the analyst knew which table was wrong but had no way to know which upstream job wrote to it, which source that job read from, or when the last successful run was.

LinkedIn’s solution — Apache Atlas (data lineage): Atlas captures read/write relationships between datasets at the column level. When you trace a metric backwards, you get a DAG showing every transformation it passed through.

AWS equivalent — Glue lineage via DataBrew or Lake Formation:

# Enable column-level lineage tracking in Lake Formation
aws lakeformation put-data-lake-settings --data-lake-settings '{
  "DataLakeAdmins": [{"DataLakePrincipalIdentifier": "arn:aws:iam::ACCOUNT:role/emr-service"}],
  "CreateTableDefaultPermissions": []
}'

# Query Glue Data Catalog for table-level lineage hints via tags
aws glue get-tables --database-name emr_labs_db \
  --query 'TableList[].{Name:Name,Tags:Parameters}'

For full column-level lineage, use OpenLineage (an open standard supported by Airflow, Spark, and dbt):

# In a PySpark job, emit lineage events automatically
from openlineage.client import OpenLineageClient
# Marquez (open-source) or Atlan collects these events
# and renders a lineage graph

Problem 2: Job dependency explosion

With 3,000+ daily Hive/Spark jobs, a single upstream delay cascades through dependencies causing 50+ downstream jobs to be late. Without a dependency graph, the on-call engineer cannot identify the root cause quickly.

LinkedIn’s solution — Azkaban DAG + SLA waterfall: Azkaban (LinkedIn’s open-source workflow scheduler, predating Airflow) stores the full dependency graph. When a job is late, it shows exactly which upstream job is blocking it and for how long.

Concrete diagnostic pattern (applicable to Oozie and any scheduler):

When an alert fires:
1. Identify the failed/delayed job (the leaf symptom)
2. Walk the dependency graph UPSTREAM to find the root cause job
3. Check that root job's last run time and error
4. Fix the root cause first — fixing the leaf job will fail again
   if the upstream data is still wrong or missing

Problem 3: Runaway jobs consuming all cluster capacity

A single mis-written GROUP BY query with no LIMIT read 2 TB of data, consumed 95% of cluster memory for 6 hours, and starved all other jobs.

LinkedIn’s solution — YARN capacity scheduler + resource quotas:

# Add per-user YARN queue with max capacity
# In yarn-site.xml (add as EMR configuration at cluster launch):
{
  "Classification": "yarn-site",
  "Properties": {
    "yarn.scheduler.capacity.root.queues": "default,batch,adhoc",
    "yarn.scheduler.capacity.root.batch.capacity": "70",
    "yarn.scheduler.capacity.root.adhoc.capacity": "30",
    "yarn.scheduler.capacity.root.adhoc.maximum-capacity": "30",
    "yarn.scheduler.capacity.root.adhoc.user-limit-factor": "1"
  }
}
-- Set per-query resource limits in Hive before running ad-hoc queries
SET hive.exec.reducers.max=20;          -- cap parallelism
SET mapreduce.map.memory.mb=2048;       -- cap per-task memory
SET tez.am.resource.memory.mb=4096;     -- cap Tez AM
SET hive.limit.pushdown.safe=true;      -- push LIMIT into map phase

Airbnb — Lineage, SLAs and Incident Playbooks

Background

Airbnb’s data engineering team (the inventors of Apache Airflow) has published extensively on their “Minerva” metrics platform and “Dataportal” data discovery tool. Their monitoring philosophy centres on three questions: Is the data fresh? Is it complete? Is it correct?

Problem 1: Different teams defining the same metric differently

“Active listings” was defined differently by the Growth team (any listing created in the last 30 days), the Payments team (any listing with a booking in the last 30 days), and the Supply team (any listing that is currently published). Dashboards showed three different numbers for the same metric.

Airbnb’s solution — Minerva (single metric store): Minerva is a company-wide metric definition layer. Metrics are defined once in YAML and computed by a central pipeline. Every dashboard, report, and experiment uses the same computed value.

Apply this pattern:

-- Define metrics in a single Hive/Athena view that all consumers query
-- Instead of letting each team write their own COUNT:
CREATE OR REPLACE VIEW emr_labs_db.metrics_county_stress AS
SELECT
  date_str,
  state,
  -- Metric definition is here, not in each consumer's query
  SUM(cases)                                         AS cumulative_cases,
  SUM(deaths)                                        AS cumulative_deaths,
  SUM(num_staffed_beds)                              AS total_staffed_beds,
  ROUND(SUM(cases)   / NULLIF(SUM(num_staffed_beds), 0), 4) AS cases_per_bed,
  ROUND(SUM(deaths)  / NULLIF(SUM(num_icu_beds),     0), 4) AS deaths_per_icu_bed
FROM emr_labs_db.covid_enriched
GROUP BY date_str, state;
-- All downstream consumers join this view, not the raw table

Problem 2: No way to know when the data was last updated

An analyst queried a table, got results, and built a report — without knowing that the pipeline had been broken for 5 days and the data was stale.

Airbnb’s solution — freshness metadata in the table itself:

-- Add a pipeline metadata table
CREATE TABLE IF NOT EXISTS emr_labs_db.pipeline_runs (
  pipeline_name   STRING,
  run_date        STRING,
  status          STRING,   -- STARTED / SUCCEEDED / FAILED
  rows_written    BIGINT,
  duration_sec    INT,
  cluster_id      STRING,
  ts              TIMESTAMP
)
STORED AS PARQUET
LOCATION 's3://${PROCESSED_BUCKET}/pipeline_runs/';

-- Insert a record at the START and END of every pipeline run
-- (from your run_lab.sh shell scripts):
# START:
hive -e "INSERT INTO emr_labs_db.pipeline_runs VALUES \
  ('lab03-ctas','$(date +%F)','STARTED',0,0,'$CLUSTER_ID',current_timestamp)"

# END (after step completes):
hive -e "INSERT INTO emr_labs_db.pipeline_runs VALUES \
  ('lab03-ctas','$(date +%F)','SUCCEEDED',$ROW_COUNT,$DURATION,'$CLUSTER_ID',current_timestamp)"
-- Any analyst can now check freshness:
SELECT pipeline_name, MAX(run_date) AS last_successful_run,
       MAX(ts) AS last_run_ts
FROM emr_labs_db.pipeline_runs
WHERE status = 'SUCCEEDED'
GROUP BY pipeline_name;

Problem 3: Cascading incident with no runbook

When the daily pipeline failed at 03:00 UTC, the on-call engineer spent 90 minutes finding the error, understanding the impact, and deciding whether to re-run, roll back, or skip. The same incident had happened 4 months earlier but the knowledge was in a Slack thread.

Airbnb’s solution — runbook-as-code in the DAG: Every Airflow DAG definition includes a doc_md field with:

  1. What the pipeline does
  2. Upstream dependencies
  3. Downstream consumers
  4. How to re-run it safely
  5. Known failure modes and their fixes

See the Runbook Template subsection below.


Meta (Facebook) — Scuba and Real-Time Drill-Down

Background

Meta’s data infrastructure team published the Scuba paper (VLDB 2013): “Scuba: Diving into Data at Facebook”. Scuba is an in-memory time-series store that ingests ~1 million events/sec and allows engineers to drill down from a high-level metric anomaly to the specific server, user cohort, or feature flag causing it — in under 10 seconds.

Key techniques applicable to EMR/Hive pipelines

Technique 1 — Stratified sampling for fast debugging

Meta engineers don’t wait for a full 6-hour Hive job to reproduce a bug. They run a 0.1% sample first:

-- Fast debugging run: 0.1% sample using Hive's TABLESAMPLE
SELECT state, COUNT(*), SUM(cases)
FROM emr_labs_db.nytimes_counties
TABLESAMPLE (0.1 PERCENT)
GROUP BY state;
-- Runs in seconds on a small cluster. Confirms logic before running at scale.

-- Or sample by bucket (deterministic, reproducible):
SELECT *
FROM emr_labs_db.nytimes_counties
TABLESAMPLE (BUCKET 1 OUT OF 100 ON fips)
LIMIT 1000;
-- The same 1% is returned every time — useful for regression testing

Technique 2 — Structured logs with consistent fields

Meta mandates that every log line is valid JSON with a fixed set of fields:

{"ts": "2025-01-01T06:00:00Z", "job": "lab03-ctas", "level": "INFO",
 "event": "step_complete", "step": "ctas-enriched",
 "rows_written": 1543210, "duration_ms": 187432, "cluster_id": "j-XXX"}

This makes logs grep-able, dashboardable (ingest into CloudWatch Logs Insights), and alertable (filter on level=ERROR).

Apply this in shell scripts:

log() {
  local level=$1; shift
  printf '{"ts":"%s","job":"%s","level":"%s","msg":"%s"}\n' \
    "$(date -u +%FT%TZ)" "${JOB_NAME:-unknown}" "$level" "$*"
}

log INFO "Starting CTAS step" 
log INFO "Cluster launched" cluster_id="$CLUSTER_ID"
log ERROR "Step failed" step="$STEP_ID" reason="$REASON"  # triggers alert

Technique 3 — Canary partitions

Before writing the full output, write one partition (e.g., one state) and validate it. If the canary partition looks wrong, abort before writing 100%.

-- Canary: run the CTAS for a single state first
CREATE TABLE emr_labs_db.covid_enriched_canary
STORED AS PARQUET
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_canary/'
AS
SELECT ... FROM ... WHERE n.state = 'New York';

-- Validate the canary
SELECT COUNT(*), AVG(cases_per_staffed_bed), MAX(cases)
FROM emr_labs_db.covid_enriched_canary;
-- If numbers look reasonable, run the full CTAS
-- If not, DROP TABLE covid_enriched_canary and fix the query

Databricks — Structured Logging and Delta Log Auditing

Background

Databricks open-sourced Delta Lake in 2019 and has since published extensively on reliability patterns for lakehouse architectures. Their key contribution to the monitoring space is the Delta transaction log — a write-ahead log (_delta_log/) that records every operation on a table.

Problem 1: “What changed and when?”

With plain Parquet/Hive, there is no record of when a partition was overwritten or by which job. If corrupt data is discovered 3 days later, you cannot determine exactly when it was introduced.

Databricks’s solution — Delta transaction log (ACID audit trail): Every INSERT, UPDATE, DELETE, MERGE, and OPTIMIZE on a Delta table appends a JSON entry to _delta_log/0000000000000000N.json:

{"commitInfo": {
  "timestamp": 1704067200000,
  "operation": "WRITE",
  "operationParameters": {"mode": "Overwrite"},
  "userMetadata": "job=lab03-ctas,cluster=j-XXX"
}}

You can audit this from Athena or Spark:

-- Delta table history (in Spark / Databricks SQL)
DESCRIBE HISTORY covid_enriched;
-- Shows: version, timestamp, operation, operationParameters, userMetadata

-- Time travel to before the bad write
SELECT * FROM covid_enriched VERSION AS OF 42;
SELECT * FROM covid_enriched TIMESTAMP AS OF '2025-01-10 06:00:00';

Apply this with Glue table properties (without Delta):

# Stamp every pipeline run's metadata onto the Glue table
aws glue update-table \
  --database-name emr_labs_db \
  --table-input "$(aws glue get-table \
      --database-name emr_labs_db --name covid_enriched \
      --query 'Table' | \
    jq '.Parameters["last_updated"] = "'$(date -u +%FT%TZ)'" |
        .Parameters["last_job"]    = "lab03-ctas" |
        .Parameters["last_cluster"]= "'$CLUSTER_ID'" |
        del(.DatabaseName,.CreateTime,.UpdateTime,.IsRegisteredWithLakeFormation,
            .CatalogId,.VersionId,.IsMultiDialectView)'
  )"

# Read the metadata back
aws glue get-table --database-name emr_labs_db --name covid_enriched \
  --query 'Table.Parameters.{last_updated:last_updated,last_job:last_job}'

Problem 2: Small files degrading query performance over time

Every incremental Hive job appends new files to a partition. After 6 months, a partition has 10,000 files of 1 MB each instead of 10 files of 1 GB each. Every Athena query spends most of its time on S3 LIST and file-open overhead.

Databricks’s solution — OPTIMIZE (auto-compaction): Delta’s OPTIMIZE command rewrites small files into large ones in the background. Delta also has Auto Optimize and Auto Compaction that trigger automatically.

Apply this with plain Parquet + Hive (manual compaction):

-- Compact a partition by rewriting it via INSERT OVERWRITE
-- Run this as a weekly maintenance job
INSERT OVERWRITE TABLE emr_labs_db.covid_enriched
PARTITION (state = 'New York')
SELECT date_str, county, fips, cases, deaths,
       num_staffed_beds, cases_per_staffed_bed
FROM   emr_labs_db.covid_enriched
WHERE  state = 'New York';
-- Hive reads all small files in the partition and writes one new Parquet file
# Detect partitions with many small files (run from Athena)
SELECT partition_key, file_count, avg_size_mb
FROM (
  SELECT '$partition' AS partition_key,
         COUNT(*) AS file_count,
         ROUND(AVG(file_size) / 1048576, 2) AS avg_size_mb
  FROM   information_schema.partitions  -- Athena metadata
  WHERE  table_schema = 'emr_labs_db'
  AND    table_name   = 'covid_enriched'
) t
WHERE file_count > 100 OR avg_size_mb < 10;

Problem 3: Concurrent writers corrupting data

Two Airflow tasks writing to the same Hive table partition simultaneously (due to a retry race condition) resulted in partial writes: each job wrote half the partition’s files, and both believed they succeeded.

Databricks’s solution — optimistic concurrency control in Delta: Delta’s transaction log provides serializable isolation. If two writers attempt to overwrite the same partition concurrently, one wins and the other receives a ConcurrentAppendException and must retry.

Apply this pattern with Hive (no Delta):

# Use S3 conditional writes or a DynamoDB lock table for mutual exclusion
# Simple advisory lock pattern:
LOCK_KEY="emr_labs_db/covid_enriched/state=NY"
aws dynamodb put-item \
  --table-name pipeline-locks \
  --item "{\"lock_key\": {\"S\": \"$LOCK_KEY\"},
           \"owner\": {\"S\": \"$CLUSTER_ID\"},
           \"ttl\": {\"N\": \"$(( $(date +%s) + 3600 ))\"}}"\
  --condition-expression "attribute_not_exists(lock_key)" \
  --return-values NONE 2>&1
# If this fails with ConditionalCheckFailedException, another job owns the lock

Synthesised Best Practices You Can Apply Now

The following table distils the above lessons into practices you can apply directly to these labs and to any Hive/EMR pipeline you build.

Monitoring

PracticeImplementationSource
Assert output row count after every jobCloudWatch custom metric + alarmNetflix
Emit pipeline completion delay as a metricput-metric-data CompletionDelayMinutesUber
Write pipeline run metadata to a Glue tablepipeline_runs table with status/rows/durationAirbnb
Tag Glue tables with owner, SLA, sensitivityaws glue tag-resourceNetflix/Metacat
Stamp last_updated/last_job on Glue table Parametersaws glue update-tableDatabricks
Structured JSON logs in all shell scriptslog() function with ts/job/level/msgMeta
CloudWatch dashboard with YARN memory + S3 bytesCloudWatch metrics AWS/EMR namespaceall

Troubleshooting

PracticeImplementationSource
Reproduce bugs on 0.1% sample before full rerunTABLESAMPLE (0.1 PERCENT)Meta
Walk dependency graph upstream to find root causeOozie/Airflow DAG viewLinkedIn
Use canary partition before writing full outputSingle-state CTAS, validate, then full CTASMeta
Check stderr log first, not stdouts3 cp .../stderr.gz - | gunzip | tailall
Preserve failed partition data before retryNever DROP TABLE before investigatingUber
Distinguish job success from data correctnessRow count assertion step after every writeNetflix

Reliability

PracticeImplementationSource
Use INSERT OVERWRITE not INSERT INTOIdempotent partition writesUber
Detect and compact small files weeklyINSERT OVERWRITE partition compaction jobDatabricks
Define metrics once in a shared viewmetrics_* Glue view, not per-consumer queryAirbnb/Minerva
Enforce schema compatibility at registrationGlue Schema Registry BACKWARD compatUber
Use YARN queue capacity limits for ad-hoc queriesadhoc queue capped at 30% capacityLinkedIn
Maintain a runbook per pipeline (see below)Markdown in DAG or wiki, link from alertAirbnb

Failure Mode Taxonomy

When something goes wrong, categorise the failure before acting. The fix depends on the failure mode, not just the symptom.

Failure Mode              Symptom                     Typical Root Cause
─────────────────────────────────────────────────────────────────────────
Silent zero output        Job succeeds, 0 rows         Wrong filter, missing partition,
                          written                       schema mismatch on join key

Partial write             Row count lower than         Spot instance reclamation
                          expected, no error           mid-write, OOM kill

Schema drift              NULL values in column        Upstream renamed a field,
                          that was non-null before     or changed delimiter

Data duplication          Row count 2× expected        INSERT INTO instead of
                                                        INSERT OVERWRITE on retry

SLA miss (late but OK)    Job completes after          Spot capacity shortage,
                          deadline, no failure          cluster bootstrap slow

Cascading delay           10+ downstream jobs late     Single upstream job blocked
                                                        on data that never arrived

Skew / hot partition      1 reducer task runs 50×      Non-uniform join key
                          longer than others            (e.g., NULL fips = millions)

Small file explosion      Athena queries get slower    Daily incremental appends
                          over months                   never compacted

OOM / GC pressure         Tez task killed with         Insufficient container
                          java.lang.OutOfMemoryError   memory, large shuffle

Metastore corruption      Hive: Table not found        Concurrent DDL, Derby DB
                          despite S3 data existing     corruption (not Glue)

Diagnostic decision tree:

Job failed?
├── Yes → Check stderr log first
│         ├── OOM / GC?     → Increase tez.am.resource.memory.mb, add nodes
│         ├── FileNotFound? → Check S3 path, Lab 00 data copy, IAM policy
│         ├── NullPointer / ClassCast? → Schema mismatch, check DESCRIBE FORMATTED
│         └── Timeout?      → Query too large for cluster, add TABLESAMPLE to debug
│
└── Job succeeded but data is wrong?
          ├── 0 rows?        → Check WHERE clause, join key nulls, partition filter
          ├── 2× rows?       → INSERT INTO on retry → switch to INSERT OVERWRITE
          ├── NULLs in key column? → Upstream schema drift, check source schema
          └── Stale data?    → Check pipeline_runs table, check Oozie coordinator status

Runbook Template

Copy this template for each lab/pipeline. Store it in a runbooks/ directory or as a comment in the Oozie workflow.xml / Airflow DAG file.

## Runbook: lab03-ctas (covid_enriched CTAS)

### What it does
Reads nytimes_counties + hospital_beds from the raw S3 zone, joins on FIPS,
computes cases_per_staffed_bed, writes Parquet to the intermediate zone,
registers covid_enriched in the Glue catalog.

### Schedule
Manual (triggered by Lab 03 run_lab.sh) or daily via Oozie coordinator at 06:00 UTC.

### Upstream dependencies
- lab00: S3 raw data must exist
- lab01: nytimes_counties and hospital_beds must be registered in Glue

### Downstream consumers
- lab05-athena: reads covid_enriched for all SQL queries
- lab06-oozie: re-runs this as part of the daily workflow
- Dashboards: any BI tool connected to Athena / Glue

### Expected output
- Table: emr_labs_db.covid_enriched
- Location: s3://{intermediate-bucket}/covid_enriched/
- Format: Parquet (Snappy)
- Row count: 1,500,000 – 1,800,000
- Columns: 10 (date_str, state, county, fips, cases, deaths,
           num_staffed_beds, num_icu_beds, cases_per_staffed_bed, deaths_per_icu_bed)

### How to re-run safely
```bash
bash labs/lab-03-transformations/scripts/run_lab.sh

The CTAS uses DROP TABLE IF EXISTS + CREATE TABLE → fully idempotent. Safe to re-run at any time. Previous data is overwritten.

Known failure modes

ErrorCauseFix
FAILED: SemanticException Table not found nytimes_countiesLab 01 not runRun lab01 first
0 rows writtenFIPS column is NULL in sourceCheck raw CSV headers
Spot capacity unavailableAWS availability zoneRe-run (script retries in 10 min)
Access Denied on s3://IAM policy not appliedRe-run terraform apply

Rollback procedure

If corrupt data was written:

# Drop the bad table (Glue entry removed, S3 data preserved because EXTERNAL)
hive -e "DROP TABLE IF EXISTS emr_labs_db.covid_enriched;"

# Remove the bad Parquet files from S3
aws s3 rm s3://{intermediate-bucket}/covid_enriched/ --recursive

# Re-run from scratch
bash labs/lab-03-transformations/scripts/run_lab.sh

On-call contact

Data Engineering team — #data-oncall Slack channel


---

## Project Structure

emr-labs/ ├── terraform/ │ ├── modules/s3/ S3 buckets (5), lifecycle, public-access block │ ├── modules/iam/ EMR service role + EC2 instance profile + Glue policy │ └── envs/dev/ Root module: wires S3 + IAM + Glue DB ├── labs/ │ ├── lab-00-foundation/ terraform apply + aws s3 sync │ ├── lab-01-hive-basics/ CREATE EXTERNAL TABLE → Glue │ ├── lab-02-glue-catalog/ DESCRIBE FORMATTED, catalog inspection │ ├── lab-03-transformations/ CTAS JOIN → Parquet intermediate │ ├── lab-04-drop-tables/ External (data safe) vs managed (data gone) │ ├── lab-05-athena/ Serverless SQL, views, LAG() window functions │ └── lab-06-oozie/ Oozie workflow.xml + coordinator.xml + submit └── shared/ ├── configs/hive-glue-config.json Hive→Glue metastore wiring ├── configs/emr-instance-fleet.json Spot fleet (cost-optimised) └── scripts/ ├── get_tf_outputs.sh Terraform → env vars └── wait_for_step.sh EMR step poller