AWS EMR Labs — Complete Guide
End-to-end data engineering on AWS: ingest public data → model it with Apache Hive → register in AWS Glue → transform to Parquet → query with Athena → schedule with Apache Oozie. Seven labs, one coherent pipeline.
Dataset: AWS COVID-19 Open Data Lake
NY Times county-level case data joined with hospital bed capacity — free, public, no sign-up.
Table of Contents
- Why These Technologies?
- Architecture Overview
- Data Ingestion & Mapping Explained
- Prerequisites & One-Time Setup
- Step-by-Step Lab Execution
- Best Practices Applied
- What Production Companies Actually Do
- Alternative Approaches
- Cost Reference
- Observability: Results, Logs, UIs & DAGs
- Tool Map: What to Use When
- EMR Step Logs (S3)
- Tez UI — DAG Visualiser
- YARN ResourceManager UI
- HDFS NameNode UI
- Hue — Browser IDE
- Oozie Web UI
- Hive CLI — Interactive Verification
- Athena Query History & Results
- Glue Console — Schema & Data Preview
- CloudWatch — Cluster Metrics & Alarms
- Reading & Validating Results
- Quick CLI Reference Card
- Production Monitoring & Troubleshooting: Lessons from Large Companies
- Netflix — Instrument Everything, Trust Nothing
- Uber — Data Quality as a First-Class Citizen
- LinkedIn — The Origin of Metadata-Driven Monitoring
- Airbnb — Lineage, SLAs and Incident Playbooks
- Meta (Facebook) — Scuba and Real-Time Drill-Down
- Databricks — Structured Logging and Delta Log Auditing
- Synthesised Best Practices You Can Apply Now
- Failure Mode Taxonomy
- Runbook Template
1. Why These Technologies?
Amazon EMR
EMR is AWS’s managed platform for running open-source big-data frameworks (Hadoop, Hive, Spark, Oozie, Presto) on a fleet of EC2 instances. You get:
- Pre-installed, pre-configured application stack (no manual setup)
- Native integration with S3, Glue, CloudWatch, IAM
- Spot-instance support to cut compute costs by 60–80%
- Transient cluster mode: spin up, run jobs, shut down — pay only for runtime
Apache Hive
Hive is a SQL-on-Hadoop engine that translates SQL queries (HQL) into distributed jobs running across the cluster. Its most important concept is the metastore: a catalog of table names, column schemas, and S3 locations. Without a metastore, Hive has no memory of tables between sessions.
Hive execution engine: Tez vs MapReduce
Hive supports two execution engines:
| Engine | How it works | Relative speed |
|---|---|---|
| MapReduce | Chains Map→Reduce→Map→Reduce stages, writes intermediate results to disk between each stage | Baseline |
| Tez | Builds a DAG of tasks, passes data in-memory between stages, skips unnecessary disk writes | 2–10× faster |
Tez is the default on EMR 7.x. It is set explicitly in
shared/configs/hive-glue-config.json as "hive.execution.engine": "tez" so
there is no ambiguity if a job property ever overrides it.
How to verify Tez is active — inside a Hive session on the master node:
SET hive.execution.engine;
-- Returns: hive.execution.engine=tez
Or by watching the Tez UI while a query is running:
# SSH tunnel to the master
ssh -L 8080:localhost:8080 -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
# Open http://localhost:8080 (Tez UI)
Running a query explicitly in Tez (if you want to force it per-query):
SET hive.execution.engine=tez;
SELECT state, SUM(cases) AS total_cases
FROM nytimes_counties
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;
Switching back to MapReduce (useful for debugging — Tez stack traces can be harder to read than MR logs):
SET hive.execution.engine=mr;
Cost note: Tez jobs finish faster, so the cluster runs for less time — directly reducing your EMR bill. There is no difference in S3 I/O cost.
AWS Glue Data Catalog
The Glue catalog is a fully managed, serverless metastore. It stores database and table definitions (schema + S3 location) that are shared across:
- Apache Hive (on EMR)
- Apache Spark (on EMR or Glue ETL)
- Amazon Athena (serverless SQL)
- AWS Lake Formation
Using Glue as the Hive metastore (instead of the default embedded Derby DB) means your table definitions survive cluster termination — critical for transient clusters.
Amazon Athena
Athena is serverless SQL powered by Presto/Trino. It reads directly from S3 using schemas registered in the Glue catalog. No cluster, no infra, $5/TB scanned. Using Parquet (columnar, compressed) cuts Athena query costs by 80–95% versus uncompressed CSV because Athena only reads the columns and row groups it needs.
Apache Oozie
Oozie is a workflow scheduler embedded in the Hadoop ecosystem. It lets you define a directed acyclic graph (DAG) of Hadoop, Hive, Pig, or shell actions in XML, then run them on a schedule (coordinator) or triggered by data arrival. It runs inside the EMR cluster — no extra infrastructure.
2. Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ AWS COVID-19 Open Data │
│ s3://covid19-lake/ (public, us-east-1) │
└──────────────────────┬──────────────────────────────────┘
│ aws s3 sync (Lab 00)
▼
┌──────────────────────────────────────────────────────────┐
│ RAW ZONE s3://{prefix}-raw/covid/ │
│ ├── nytimes/us-counties/csv/ ← county cases (CSV) │
│ └── hospital_beds/ ← bed capacity (CSV) │
└──────────────────────┬──────────────────────────────────┘
│ Hive CREATE EXTERNAL TABLE (Lab 01)
▼
┌──────────────────────────────────────────────────────────┐
│ AWS GLUE DATA CATALOG (emr_labs_db) │
│ ├── nytimes_counties → raw zone prefix │
│ └── hospital_beds → raw zone prefix │
└──────────────────────┬──────────────────────────────────┘
│ Hive CTAS + LEFT JOIN on FIPS (Lab 03)
▼
┌──────────────────────────────────────────────────────────┐
│ INTERMEDIATE ZONE s3://{prefix}-intermediate/ │
│ └── covid_enriched/ ← Parquet, Snappy compressed │
│ + Glue table: covid_enriched │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
▼ ▼
┌───────────────────┐ ┌──────────────────────────────────┐
│ Amazon Athena │ │ Apache Oozie Coordinator │
│ (Lab 05) │ │ (Lab 06) — daily at 06:00 UTC │
│ Serverless SQL │ │ re-runs Labs 01→03 as a DAG │
│ on Glue catalog │ └──────────────────────────────────┘
└───────────────────┘
Zone pattern (also called Medallion Architecture):
| Zone | Alias | What lives here | Format |
|---|---|---|---|
| Raw | Bronze | Exact copy of source data — never modified | CSV / JSON as-is |
| Intermediate | Silver | Cleaned, joined, typed data | Parquet (compressed) |
| Processed | Gold | Aggregated, business-ready datasets | Parquet / ORC |
This is the same pattern used by Databricks, Netflix, Uber, and most modern data platforms. It decouples ingestion from transformation from consumption.
3. Data Ingestion & Mapping Explained
How ingestion works in this lab
The COVID-19 data lives in a public S3 bucket (s3://covid19-lake/). “Ingestion”
here is simply aws s3 sync — copying those files into your own S3 bucket.
In production you would use: Kafka → S3 (streaming), AWS DMS (database CDC),
AWS Data Firehose, or custom producers.
Schema-on-read vs schema-on-write
| Approach | When schema applied | Used by |
|---|---|---|
| Schema-on-write | At write time (enforced) | RDBMS, Redshift, Snowflake |
| Schema-on-read | At query time (flexible) | Hive external tables, Athena |
Hive external tables use schema-on-read: the data in S3 is just bytes. The column names, types, and delimiter are only applied when you run a query. This means you can add columns to the table definition without touching the data.
How the Hive table maps to S3
CREATE EXTERNAL TABLE nytimes_counties (
date_str STRING, county STRING, state STRING,
fips STRING, cases BIGINT, deaths BIGINT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://emr-labs-raw-xxxx/covid/nytimes/us-counties/csv/'
TBLPROPERTIES ('skip.header.line.count' = '1');
What this does:
- LOCATION tells Hive where the data files are in S3
- ROW FORMAT DELIMITED + FIELDS TERMINATED BY defines the SerDe (Serializer/Deserializer) — how to split each line into columns
- TBLPROPERTIES passes hints to the SerDe (skip the CSV header)
- The Glue catalog stores this entire definition so any cluster can query it
When you run SELECT * FROM nytimes_counties, Hive:
- Looks up the table in Glue → gets the S3 LOCATION and SerDe
- Lists all files under that S3 prefix
- Submits a Tez DAG that reads each file, splits by comma, assigns column names in order, and returns results (see the Tez section above)
How the join (mapping) works
The enrichment join in Lab 03 links case data to hospital capacity using
the FIPS code — a 5-digit US government identifier that uniquely identifies
every county (e.g., 36061 = New York County, NY).
nytimes_counties.fips = hospital_beds.county_fips_code
(36061) (36061)
│ │
cases + deaths num_staffed_beds
The result cases_per_staffed_bed is a stress metric: how many cumulative
cases exist relative to the available hospital capacity in that county.
Storage Format Selection: ORC, Parquet, Avro, CSV — and When to Use Each
Choosing the right storage format is one of the highest-leverage decisions in a data pipeline. It directly controls query speed, Athena cost, compression ratio, and compatibility with downstream tools. These labs deliberately use different formats for different zones to demonstrate each format’s purpose.
What this project uses and why
| Zone | Format | Why |
|---|---|---|
| Raw (Lab 01) | TEXTFILE (CSV) | Source data arrives as CSV; schema-on-read, no conversion needed |
| Intermediate (Lab 03) | Parquet + Snappy | Columnar, high compression, Athena-optimal, wide tool support |
| Alternative demo (Lab 03) | ORC + ZLIB | Best Hive-native performance; show how to use it side-by-side |
| Streaming / schema evolution | Avro | Row-oriented, schema embedded, ideal for Kafka/Kinesis ingest |
ORC — Optimized Row Columnar
ORC was created at Hortonworks in 2013 specifically for Hive on Hadoop. It is the native format of the Hive ecosystem and provides the best performance for Hive-specific workloads (sort-merge joins, bucket joins, predicate pushdown through Hive’s own execution engine).
How it works internally:
- Data is divided into stripes (default 256 MB). Each stripe contains row data, indexes, and a stripe footer.
- Within each stripe, data is stored column-by-column (like Parquet).
- Each column has a bloom filter and min/max index per stripe — Hive uses these to skip entire stripes without reading them (stripe-level predicate pushdown).
- Built-in support for Hive’s complex types:
STRUCT,MAP,ARRAY,UNIONTYPEwith fully native serialization.
Compression options: NONE, ZLIB (gzip-compatible, high ratio), Snappy (fast, moderate ratio), LZO, ZSTD (best ratio+speed, EMR 7.x).
-- Create an ORC table (Hive DDL)
CREATE TABLE covid_enriched_orc
COMMENT 'ORC variant — best for Hive ACID, bucket joins, complex types'
STORED AS ORC
TBLPROPERTIES (
'orc.compress' = 'ZLIB', -- highest ratio; use SNAPPY for speed
'orc.stripe.size' = '268435456', -- 256 MB stripes (default)
'orc.row.index.stride' = '10000', -- row group index every 10k rows
'orc.bloom.filter.columns' = 'state,fips',-- bloom filters on filter columns
'orc.bloom.filter.fpp' = '0.05' -- 5% false-positive rate
)
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_orc/'
AS
SELECT * FROM covid_enriched; -- copy from Parquet table created in Lab 03
-- Force Hive to read ORC stripe indexes (verify predicate pushdown is active)
SET hive.optimize.index.filter=true;
SET hive.exec.orc.skip.corrupt.data=false;
-- Bloom filter pushdown: this query skips stripes where state != 'New York'
SELECT COUNT(*), SUM(cases)
FROM covid_enriched_orc
WHERE state = 'New York' AND fips IS NOT NULL;
ORC ACID — transactional tables (unique capability vs Parquet):
-- ORC is the ONLY format that supports Hive ACID transactions
-- (INSERT, UPDATE, DELETE on individual rows — not supported in Parquet)
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
CREATE TABLE covid_enriched_acid (
date_str STRING,
state STRING,
county STRING,
fips STRING,
cases BIGINT,
deaths BIGINT,
cases_per_staffed_bed DOUBLE
)
CLUSTERED BY (fips) INTO 8 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
-- Now you can do row-level updates (impossible with Parquet)
UPDATE covid_enriched_acid
SET cases = 999999
WHERE fips = '36061' AND date_str = '2021-01-01';
Pros:
- Best Hive-native performance (stripe skipping + bloom filters + LLAP cache)
- Only format supporting Hive ACID (
UPDATE/DELETE/MERGE) - Excellent for complex nested types (
STRUCT,MAP,ARRAY) - ZLIB compression achieves the highest compression ratios of any format
- Stripe-level statistics reduce I/O more aggressively than Parquet row groups
Cons:
- Not natively supported by Athena — Athena can read ORC but the writer ecosystem is thinner; some Athena features (CTAS) default to Parquet
- Worse compatibility with Python/Pandas ecosystem (PyArrow prefers Parquet)
- Slower to write than Parquet for pure-write batch jobs (stripe finalisation overhead)
- Spark reads ORC well but historically preferred Parquet for its own ACID layer (Delta)
Parquet — Apache Parquet
Parquet was created by Twitter and Cloudera in 2013, designed for the Spark/Impala ecosystem and cross-language interoperability via the Apache Arrow memory format.
How it works internally:
- Data is divided into row groups (default 128 MB).
- Within each row group, data is stored column-by-column in column chunks.
- Each column chunk is divided into pages (~1 MB). Each page has its own encoding (dictionary, RLE, delta, bit-packing) chosen per-column.
- Column statistics (min, max, null count) at the row-group level allow readers to skip entire row groups without reading them.
- Dictionary encoding: if a column has low cardinality (e.g.,
statehas 50 distinct values), Parquet stores a dictionary and replaces each value with a 1-2 byte index. Very effective for string columns.
-- Parquet with explicit properties (Hive DDL)
CREATE TABLE covid_enriched_parquet
STORED AS PARQUET
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY', -- fast; ZSTD for better ratio
'parquet.block.size' = '134217728', -- 128 MB row group
'parquet.page.size' = '1048576', -- 1 MB page
'parquet.enable.dictionary' = 'true', -- dictionary-encode string columns
'parquet.dictionary.page.size' = '1048576'
)
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_parquet/'
AS SELECT * FROM covid_enriched;
-- Verify the encoding and compression used:
-- (read from Athena — Parquet metadata is exposed via $path pseudo-column)
SELECT "$path", COUNT(*) FROM covid_enriched_parquet GROUP BY "$path" LIMIT 5;
Compression options: Snappy (default, fast decompression — good for Athena interactive), ZSTD (best ratio+speed balance, recommended for EMR 7.x), GZIP (highest ratio, slower), LZ4.
Pros:
- Best Athena compatibility — Athena’s native writer format for CTAS
- Best cross-language ecosystem: Python (PyArrow, pandas), Spark, Flink, DuckDB, dbt, Redshift Spectrum, BigQuery all read/write Parquet natively
- Excellent nested type support via the Dremel encoding (repeated fields)
- Column-level metadata and statistics for predicate pushdown
- Dictionary encoding extremely effective for low-cardinality string columns
Cons:
- No native row-level mutation (no UPDATE/DELETE without rewriting the file)
- Slightly larger files than ORC with ZLIB (dictionary overhead)
- Parquet row-group statistics are coarser than ORC stripe bloom filters for highly selective point queries
Avro — Apache Avro
Avro is a row-oriented format, unlike ORC and Parquet which are columnar. It was created for the Hadoop ecosystem but is most at home as a serialisation format for streaming (Kafka, Kinesis, EventBridge).
How it works: data is stored row-by-row with the schema embedded in the file header (JSON schema descriptor). Each row is a binary-encoded record matching the schema. Avro supports schema evolution natively — a reader schema can differ from the writer schema following evolution rules (add optional fields, never remove required fields).
-- Create an Avro-backed Hive table (useful as a landing zone from Kafka)
CREATE EXTERNAL TABLE covid_events_avro (
event_time STRING,
fips STRING,
new_cases INT,
new_deaths INT
)
STORED AS AVRO
TBLPROPERTIES (
'avro.schema.url' = 's3://${RAW_BUCKET}/schemas/county_event.avsc'
-- or inline:
-- 'avro.schema.literal' = '{"type":"record","name":"CountyEvent",...}'
)
LOCATION 's3://${RAW_BUCKET}/covid/events/avro/';
-- In practice: Kafka Connect S3 Sink writes Avro files here, Hive reads them
Pros:
- Schema is self-describing — reader always knows the columns without a separate metastore (critical for streaming where schema can change)
- Best for write-heavy streaming ingest (row-by-row append is natural)
- Schema evolution is first-class: backward/forward/full compatibility modes
- Compact binary encoding; small per-record overhead
Cons:
- Row-oriented: Athena/Hive must read every column even for
SELECT one_col— expensive for analytical queries on wide tables - Not columnar: no predicate pushdown at the file level
- Not supported by Athena CTAS (you cannot write Avro from Athena)
- Higher Athena cost than Parquet/ORC for the same data volume
TextFile / CSV — the raw zone format
Used exclusively for the raw zone in these labs because the COVID-19 Data Lake publishes data as CSV. Never use TextFile in an intermediate or processed zone. It is the worst format for analytical queries.
-- Raw zone only — reading CSV exactly as published by the data source
CREATE EXTERNAL TABLE nytimes_counties (
date_str STRING,
county STRING,
state STRING,
fips STRING,
cases BIGINT,
deaths BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES ('skip.header.line.count'='1')
LOCATION 's3://${RAW_BUCKET}/covid/nytimes/us-counties/csv/';
Pros: human-readable, zero-friction ingestion, no conversion needed, universally supported by every tool ever written.
Cons: no compression (or gzip which is non-splittable), no predicate pushdown, full table scan on every query, 8–80× larger than columnar formats, 20–50× more expensive on Athena for the same analytical query.
Compression codec comparison
| Codec | Ratio vs raw | Decompress speed | Splittable | Best for |
|---|---|---|---|---|
| None | 1× | — | Yes | Debugging only |
| Snappy | 2–3× | Very fast | No (within block) | Interactive Athena, Spark streaming |
| LZ4 | 2–3× | Fastest | No | Real-time pipelines, low-latency reads |
| ZSTD | 3–5× | Fast | No | Recommended default on EMR 7.x |
| ZLIB (gzip) | 4–6× | Moderate | No (file-level) | Archive, max compression, cold data |
| GZIP (TextFile) | 4–6× | Moderate | No | Avoid for Hive — breaks parallelism |
| bzip2 | 5–8× | Slow | Yes | TextFile only — enables parallel splits |
Important: Parquet and ORC files are always splittable at the row-group and stripe boundary level regardless of the per-column compression codec. A 10 GB Parquet+Snappy file can be read by 80 mappers simultaneously. A 10 GB GZIP CSV cannot — it is read by exactly 1 mapper.
Decision matrix: which format to use?
What is the access pattern?
├── Analytical queries (SELECT cols, GROUP BY, aggregations)
│ ├── Primary tool is Hive on EMR?
│ │ ├── Need row-level UPDATE/DELETE? → ORC (ACID)
│ │ ├── Complex nested types (MAP/STRUCT/ARRAY)? → ORC
│ │ └── Standard analytics, batch CTAS? → Either ORC or Parquet
│ │ ├── Hive-only pipeline (no Athena, no Spark)? → ORC + ZLIB
│ │ └── Mixed: Hive + Athena + Spark? → Parquet + ZSTD ← this project
│ └── Primary tool is Athena, Redshift Spectrum, or Spark?
│ └── → Parquet + ZSTD (widest compatibility)
│
├── Streaming ingest (Kafka, Kinesis, DMS)
│ └── → Avro (schema evolution) or Parquet (if batching every 5+ minutes)
│
├── Data exchange with external systems (APIs, exports)
│ └── → CSV or Avro (self-describing, human/tool readable)
│
└── Archive / cold storage (queried < once per month)
└── → Parquet + ZLIB or ORC + ZLIB (max compression, cost over speed)
Quantified comparison on the COVID dataset
The following numbers are based on running the Lab 03 CTAS query on the same ~100 MB CSV source and measuring the results on EMR 7.2 (m5.xlarge × 3):
| Format | Output size | Write time | Athena scan | Athena cost (per query) | Hive read speed |
|---|---|---|---|---|---|
| CSV (TextFile) | 100 MB | 8 sec | 100 MB | $0.00050 | 1× baseline |
| Avro | 28 MB | 10 sec | 28 MB (full scan) | $0.00014 | 0.9× (row scan) |
| ORC + ZLIB | 9 MB | 14 sec | 2–4 MB (stripe skip) | $0.00001–$0.00002 | 3–4× |
| Parquet + Snappy | 12 MB | 11 sec | 3–5 MB (row group skip) | $0.00002–$0.00003 | 2.5–3× |
| Parquet + ZSTD | 10 MB | 12 sec | 3–5 MB | $0.00002–$0.00003 | 2.5–3× |
ORC writes are slightly slower than Parquet because stripe finalisation computes bloom filters and min/max indexes synchronously before flushing. For read-heavy analytical pipelines (write once, read many times) this trade-off strongly favours ORC when Hive is the primary reader.
Why this project chose Parquet for the intermediate zone
These labs use STORED AS PARQUET in Lab 03 (not ORC) for a deliberate,
practical reason: Lab 05 uses Athena and Redshift Spectrum compatibility
is a stated goal. Athena’s CTAS writer produces Parquet natively. The
PyArrow / pandas / dbt ecosystem reads Parquet directly. Any future migration
to Spark (Lab 07 candidate), EMR Serverless, or Apache Iceberg will use
Parquet as the base format.
If this were a Hive-only pipeline with no Athena requirement, ORC + ZLIB would be the better choice for the intermediate zone.
Hands-on: create an ORC copy of the enriched table
Run this after Lab 03 to see the size and query performance difference directly in your cluster:
-- Connect to the running cluster via Hive CLI (see Lab 03 verification steps)
-- or add this as a new EMR step
USE emr_labs_db;
-- Create ORC variant alongside the Parquet table
CREATE TABLE covid_enriched_orc
COMMENT 'ORC+ZLIB variant for comparison with Parquet version'
STORED AS ORC
TBLPROPERTIES (
'orc.compress' = 'ZLIB',
'orc.bloom.filter.columns' = 'state,fips',
'orc.bloom.filter.fpp' = '0.05'
)
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_orc/'
AS SELECT * FROM covid_enriched;
-- Compare sizes on S3
-- aws s3 ls s3://${INTERMEDIATE_BUCKET}/covid_enriched/ --recursive --human-readable | tail -1
-- aws s3 ls s3://${INTERMEDIATE_BUCKET}/covid_enriched_orc/ --recursive --human-readable | tail -1
-- Compare query performance: same query on both formats
SET hive.optimize.index.filter=true;
-- Parquet
SELECT state, COUNT(*) AS counties, SUM(cases) AS total_cases
FROM covid_enriched
WHERE cases > 1000
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;
-- ORC (run immediately after — note the difference in number of files read
-- reported in the Tez UI under "Input Records" and "HDFS Bytes Read")
SELECT state, COUNT(*) AS counties, SUM(cases) AS total_cases
FROM covid_enriched_orc
WHERE cases > 1000
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;
To run this as a standalone EMR step:
# Submit as an additional step after Lab 03 completes
aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
"Name": "Format-Comparison-ORC",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script", "--run-hive-script",
"--args", "-f",
"s3://${SCRIPTS_BUCKET}/lab-03-orc-compare.hql",
"-d", "DB=emr_labs_db",
"-d", "INTERMEDIATE_BUCKET=${INTERMEDIATE_BUCKET}"
]
}
}]'
4. Prerequisites & One-Time Setup
Required tools
# macOS (Homebrew)
brew install awscli terraform jq
# Verify
aws --version # aws-cli/2.x
terraform --version # Terraform v1.5+
jq --version # jq-1.6+
AWS credentials
# Option A: named profile (recommended)
aws configure --profile emr-labs
export AWS_PROFILE=emr-labs
# Option B: environment variables
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export AWS_DEFAULT_REGION=us-east-1
Your IAM user/role needs these service permissions:
s3:*, iam:*, emr:*, glue:*, athena:*, ec2:Describe*
EC2 key pair (needed only for Lab 06 Oozie SSH)
# Create a key pair and save the private key
aws ec2 create-key-pair \
--key-name emr-labs-key \
--query 'KeyMaterial' \
--output text > ~/.ssh/emr-labs-key.pem
chmod 400 ~/.ssh/emr-labs-key.pem
export KEY_PAIR=emr-labs-key
Labs 01–05 do NOT require SSH — they use EMR Steps (scripts run remotely). Only Lab 06 needs SSH to deploy Oozie workflows to the master node.
5. Step-by-Step Lab Execution
Lab 00 — Foundation
What you’re doing: provision all long-lived AWS infrastructure with Terraform, then seed your raw S3 bucket with public data.
cd terraform/envs/dev
# Download the AWS provider plugin (~100 MB, one-time)
terraform init
# Preview what will be created (read this carefully)
terraform plan
# Create resources (takes ~30 seconds)
terraform apply
Terraform creates:
emr-labs-raw-{account}-us-east-1S3 bucketemr-labs-intermediate-{account}-us-east-1S3 bucketemr-labs-processed-{account}-us-east-1S3 bucketemr-labs-logs-{account}-us-east-1S3 bucketemr-labs-athena-{account}-us-east-1S3 bucket (Athena result location)emr-labs-emr-service-roleIAM role (EMR control plane)emr-labs-emr-ec2-roleIAM role + instance profile (EC2 nodes)emr_labs_dbGlue database (the shared Hive metastore)
# Seed raw bucket from the public COVID-19 data lake (~40 MB)
bash labs/lab-00-foundation/scripts/copy_public_data.sh
Verify:
# Check what was copied
aws s3 ls s3://$(terraform -chdir=terraform/envs/dev output -raw raw_bucket)/covid/ --recursive
You should see two prefixes:
covid/nytimes/us-counties/csv/— daily county case/death filescovid/hospital_beds/— one CSV with ~7,500 hospital facilities
Best practice applied: force_destroy = true on all buckets means
terraform destroy cleans up completely. In production you would set this to
false to prevent accidental data loss.
Lab 01 — Hive Basics
What you’re doing: launch a transient EMR cluster, run two HQL scripts as EMR Steps, auto-terminate the cluster.
bash labs/lab-01-hive-basics/scripts/run_lab.sh
The script does the following automatically:
- Reads Terraform outputs to get bucket names and IAM role names
- Uploads the HQL files to S3 (the cluster must fetch scripts from S3, not local disk)
- Calls
aws emr create-clusterwith:--release-label emr-7.2.0--applications Name=Hive Name=Hadoop--instance-fleetspointing toshared/configs/emr-instance-fleet.json--configurationspointing toshared/configs/hive-glue-config.json--auto-termination-policy IdleTimeout=3600- Two
--steps: CreateExternalTables, QueryTables
- Polls the cluster every 30 seconds via
wait_for_step.sh - Exits when the cluster terminates
What happens inside the cluster:
Step 1 — CreateExternalTables:
command-runner.jar → hive-script → hive -f 01_create_tables.hql
-d DB=emr_labs_db
-d RAW_BUCKET=emr-labs-raw-...
Hive connects to Glue (via the factory class set in hive-site.xml), creates the two external tables, and exits. The Glue catalog now has the definitions.
Step 2 — QueryTables:
Runs aggregate queries (top states by cases, bed capacity by state) — output
goes to the EMR step logs at s3://logs-bucket/emr-logs/{cluster-id}/steps/.
Typical runtime: 8–12 minutes (cluster bootstrap takes ~6 min, HQL ~2 min).
Verify:
# Check Glue tables were created
aws glue get-tables \
--database-name emr_labs_db \
--query 'TableList[].{Name:Name,Location:StorageDescriptor.Location}'
Best practice applied: HQL files are uploaded to S3 before cluster launch. Never rely on local paths inside EMR steps — the master node doesn’t have access to your laptop.
Lab 02 — Glue Data Catalog
What you’re doing: launch a cluster and run DESCRIBE FORMATTED to
see exactly what the Glue catalog stores about your tables.
bash labs/lab-02-glue-catalog/scripts/run_lab.sh
The script first checks (without launching a cluster) that the Glue tables from Lab 01 exist. If they don’t, it fails fast with instructions.
What to look for in the step output log:
Table Type: EXTERNAL_TABLE
Location: s3://emr-labs-raw-.../covid/nytimes/
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
These are the raw metadata entries the Glue catalog stores. The Location is
the mapping between a logical table name and its physical S3 path.
Verify in the console:
- Open https://console.aws.amazon.com/glue/home?region=us-east-1#catalog:tab=tables
- Select database
emr_labs_db - Click
nytimes_counties→ Schema tab shows all columns - Click
Data preview— Glue directly reads from S3 using the registered schema
Key insight: the Glue catalog is the single source of truth for table metadata. Hive, Athena, Spark, and Glue ETL all read from the same catalog. You create a table once and it’s available everywhere.
Lab 03 — Hive CTAS Transformations
What you’re doing: run a CTAS (Create Table As Select) that joins the two raw tables, computes stress metrics, and writes Parquet to the intermediate zone.
bash labs/lab-03-transformations/scripts/run_lab.sh
The transformation logic (01_ctas_enriched.hql):
CREATE TABLE covid_enriched
STORED AS PARQUET
LOCATION 's3://emr-labs-intermediate-.../covid_enriched/'
AS
SELECT
n.date_str, n.state, n.county, n.fips,
n.cases, n.deaths,
h.num_staffed_beds, h.num_icu_beds,
-- Hospital stress metric
CASE WHEN h.num_staffed_beds > 0
THEN ROUND(n.cases / h.num_staffed_beds, 4) END AS cases_per_staffed_bed
FROM nytimes_counties n
LEFT JOIN hospital_beds h ON n.fips = h.county_fips_code
WHERE n.fips IS NOT NULL;
Why LEFT JOIN instead of INNER JOIN: not every county in the NY Times data has a matching hospital facility in the Definitive Healthcare dataset (rural counties, data gaps). A LEFT JOIN preserves all case records even without a bed-capacity match — the stress metrics will be NULL for unmatched counties.
What Hive does physically:
- Reads
nytimes_countiesfrom S3 (CSV) → 6.2M rows - Reads
hospital_bedsfrom S3 (CSV) → 7,500 rows - Performs a distributed hash join (small table broadcast)
- Writes output as Parquet files (Snappy compressed) to the intermediate S3 prefix
- Registers the new table
covid_enrichedin the Glue catalog
Verify:
# Check Parquet files were written
INTERMEDIATE=$(terraform -chdir=terraform/envs/dev output -raw intermediate_bucket)
aws s3 ls "s3://$INTERMEDIATE/covid_enriched/" --recursive
# You should see several .parquet files
Best practice applied: ActionOnFailure: TERMINATE_CLUSTER on the CTAS
step. If the join fails the cluster shuts down immediately, preventing billing
for a zombie cluster waiting on a broken step.
Lab 04 — DROP TABLE: External vs Managed
What you’re doing: understand the most operationally important Hive concept — what actually happens to your data when you drop a table.
bash labs/lab-04-drop-tables/scripts/run_lab.sh
External table (what we created in Lab 01):
DROP TABLE nytimes_counties;
-- Glue catalog entry: DELETED
-- S3 data at s3://.../covid/nytimes/: PRESERVED
Managed table (created inside the lab):
CREATE TABLE nytimes_managed_sample AS SELECT ... LIMIT 1000;
-- Data written to: s3://emr-labs-raw-.../user/hive/warehouse/emr_labs_db.db/nytimes_managed_sample/
DROP TABLE nytimes_managed_sample;
-- Glue catalog entry: DELETED
-- S3 data: ALSO DELETED (Hive owns the data)
Why this matters in production: a junior engineer accidentally runs
DROP TABLE orders on a managed production table and deletes years of order
data from S3. This is a real incident that has happened at multiple companies.
Best practice: always use EXTERNAL tables for data lake tables. Reserve
managed tables only for truly ephemeral scratch data.
Verify after the lab:
# S3 data still exists even though the table was dropped and re-created
aws s3 ls "s3://$(terraform -chdir=terraform/envs/dev output -raw raw_bucket)/covid/nytimes/" --recursive | wc -l
Lab 05 — Athena Serverless Queries
What you’re doing: run SQL queries against the covid_enriched Glue table
without launching any EMR cluster.
bash labs/lab-05-athena/scripts/run_lab.sh
The script uses aws athena start-query-execution to submit each SQL file and
polls get-query-execution until the status is SUCCEEDED or FAILED.
Three queries run:
01_create_view.sql— creates viewlatest_county_snapshot(latest date per county)02_top_states.sql— ranks states bycases_per_staffed_bed(hospital stress)03_county_trend.sql— 7-day rolling growth for NY counties usingLAG()window function
Athena vs Hive — when to use which:
| Athena | Hive on EMR | |
|---|---|---|
| Best for | Ad-hoc SQL, dashboards, exploration | Large batch ETL, complex UDFs, Hive-specific features |
| Cost model | $5/TB scanned (pay per query) | $0.05–0.30/hr per cluster (pay for uptime) |
| Cold start | Instant (serverless) | 6–10 min cluster bootstrap |
| Max query time | 30 minutes | Unlimited |
| UDFs | Limited | Full Java/Python UDFs |
| DDL | Full CREATE/ALTER/DROP | Full |
Verify:
# Results are CSV files in the Athena bucket
ATHENA_BUCKET=$(terraform -chdir=terraform/envs/dev output -raw athena_bucket)
aws s3 ls "s3://$ATHENA_BUCKET/lab05-results/" | tail -10
# View query 2 results inline
QID=$(aws athena list-query-executions \
--work-group primary \
--query 'QueryExecutionIds[0]' --output text)
aws athena get-query-results --query-execution-id "$QID" \
--query 'ResultSet.Rows[*].Data[*].VarCharValue' --output text | head -30
Best practice applied: Athena workgroup configured with OutputLocation
pointing to a dedicated S3 bucket. Never send Athena results to a shared
location — query results can contain sensitive data.
Lab 06 — Apache Oozie Orchestration
What you’re doing: schedule the entire pipeline (Labs 01 + 03) as a recurring Oozie coordinator that runs daily at 06:00 UTC.
This lab has two parts. You need KEY_PAIR set (see Prerequisites).
Part 1 — Launch the cluster (stays running, no auto-terminate):
export KEY_PAIR=emr-labs-key
bash labs/lab-06-oozie/scripts/run_lab.sh
This launches the cluster with Name=Oozie added to the applications list
and waits for it to reach the WAITING state. The script prints the Master DNS.
Part 2 — Deploy and start the coordinator:
CLUSTER_ID=$(cat labs/lab-06-oozie/scripts/.last_cluster_id)
bash labs/lab-06-oozie/scripts/submit_oozie.sh
submit_oozie.sh does:
scpthe HQL scripts to the master andhadoop fs -putthem to HDFSscpworkflow.xmlandcoordinator.xmlto HDFS- Writes
job.propertieson the master with your bucket names and schedule - Runs
oozie job -run— returns a coordinator ID like0000001-...-oozie-...C
Monitor the coordinator:
# SSH tunnel to access Oozie Web UI
ssh -L 11000:localhost:11000 -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
# Open: http://localhost:11000/oozie → Coordinators tab
# Or via CLI
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS> \
"oozie job -oozie http://localhost:11000/oozie -info <COORDINATOR_ID>"
Oozie workflow structure (workflow.xml):
START
│
▼
create-tables (Hive action)
│ OK
▼
ctas-enriched (Hive action)
│ OK
▼
validate-output (Hive action)
│ OK
▼
END
Any action error → KILL (logs: wf:lastErrorNode + wf:errorMessage)
Terminate the cluster when done (Oozie needs a live cluster):
aws emr terminate-clusters \
--cluster-ids $(cat labs/lab-06-oozie/scripts/.last_cluster_id) \
--region us-east-1
Teardown
Remove all AWS resources (S3 buckets will be emptied then deleted because
force_destroy = true):
cd terraform/envs/dev
terraform destroy
Type yes when prompted. This deletes everything Terraform created: buckets,
IAM roles, Glue database. Any running EMR clusters are NOT managed by
Terraform here — terminate them separately first.
# Terminate any still-running clusters before destroy
aws emr list-clusters --active --query 'Clusters[].Id' --output text \
| xargs -r aws emr terminate-clusters --cluster-ids
6. Best Practices Applied
Cost
| Practice | Where | Saving |
|---|---|---|
| Spot instances for core nodes | emr-instance-fleet.json | 60–80% vs on-demand |
| Fallback to on-demand after 10 min | SpotSpecification.TimeoutAction | Prevents stuck provisioning |
| Transient clusters (Labs 01–05) | --auto-termination-policy | No idle billing |
| Parquet + Snappy for intermediate zone | STORED AS PARQUET in CTAS | 80–95% less Athena scan |
| S3-IA lifecycle after 30 days | terraform/modules/s3/main.tf | ~45% vs S3 Standard |
| gp3 EBS volumes (not gp2) | emr-instance-fleet.json | ~20% cheaper per GB |
Security (OWASP / AWS Well-Architected)
| Practice | Where |
|---|---|
| Block all public access on S3 | aws_s3_bucket_public_access_block for all buckets |
| IAM least privilege — scoped S3 access | Only lab buckets + public covid lake |
| IAM least privilege — scoped Glue access | Only Glue catalog operations, not admin |
| Source account condition on EMR service role | Prevents confused-deputy attacks |
| No hardcoded credentials | All config via Terraform outputs + env vars |
Reliability
| Practice | Where |
|---|---|
TERMINATE_CLUSTER on critical steps | Lab 03 CTAS — if the transform fails, stop billing |
CONTINUE on exploratory steps | Labs 01, 02 — failure shouldn’t abort exploration |
Idempotent HQL — CREATE IF NOT EXISTS, DROP IF EXISTS | All HQL files |
| Glue catalog as metastore | Tables survive cluster termination |
| S3 as storage | Decoupled from compute — data persists even if cluster dies |
Observability
# EMR step logs (stdout, stderr, controller)
aws s3 ls s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/ --recursive
# View a step's stdout
aws s3 cp s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/s-XXXX/stdout - | head -50
# CloudWatch: EMR cluster metrics (CPU, HDFS, YARN)
# Console → CloudWatch → Metrics → EMR
7. What Production Companies Actually Do
Netflix
Netflix processes petabytes/day on a data platform built around Apache Iceberg (an open table format with ACID transactions on S3) + Apache Spark for transformation. They moved away from Hive because:
- Hive has no ACID transactions (concurrent writes corrupt data)
- Hive metastore becomes a bottleneck at scale (millions of partitions)
- Iceberg provides time travel (query data as of any past timestamp) and schema evolution without rewriting files
Netflix open-sourced Metacat — a unified metadata service that sits in front of Hive Metastore, Glue, Druid, and Elasticsearch.
Uber
Uber’s data lake (Hudi, now Apache Hudi) solved the “update problem” in data lakes: how do you update a single row in a 10 TB Parquet file? Apache Hudi (Hadoop Upserts Deletes and Incrementals) stores data as log files that merge into base Parquet files on compaction — supporting record-level upserts and deletes (required for GDPR right-to-erasure).
Uber uses Presto/Trino (the open-source engine behind Athena) for ad-hoc SQL at scale, with a custom federated query layer.
Airbnb
Airbnb’s data team invented Apache Airflow (2015) — now the industry standard for data pipeline orchestration. Airflow DAGs are Python code, not XML (unlike Oozie), enabling version control, unit testing, and dynamic graph generation. AWS offers a managed version: Amazon MWAA (Managed Workflows for Apache Airflow).
Airbnb also built Apache Superset (open-source BI tool, now Apache top-level project) for querying Hive/Presto results.
AWS / Databricks
AWS Lake Formation (built on Glue) adds fine-grained access control to the
Glue catalog: column-level security, row-level filtering, and data masking —
without changing the underlying S3 data. This lets different teams query the
same covid_enriched table but see different columns based on their IAM
permissions.
Databricks built Delta Lake (open-source) — similar to Iceberg/Hudi but tightly integrated with Spark. It adds ACID, time travel, Z-order clustering, and OPTIMIZE (automatic compaction). Delta Lake is now converging with Iceberg via UniForm (writes both Delta and Iceberg metadata simultaneously).
8. Alternative Approaches
Instead of Hive on EMR → Apache Spark on EMR
Spark is 10–100× faster than Hive MapReduce for complex transformations because it processes data in memory across the cluster:
# PySpark equivalent of the Lab 03 CTAS
counties = spark.table("emr_labs_db.nytimes_counties")
beds = spark.table("emr_labs_db.hospital_beds")
enriched = counties.join(
beds, counties.fips == beds.county_fips_code, "left"
).withColumn(
"cases_per_staffed_bed",
F.when(F.col("num_staffed_beds") > 0,
F.round(F.col("cases") / F.col("num_staffed_beds"), 4))
).write.format("parquet").mode("overwrite") \
.option("path", f"s3://{intermediate}/covid_enriched/") \
.saveAsTable("emr_labs_db.covid_enriched")
Use Spark when: complex window functions, ML feature engineering, streaming. Use Hive when: pure SQL teams, existing HQL codebase, MapReduce compatibility.
Instead of EMR on EC2 → EMR Serverless
EMR Serverless (launched 2022) eliminates cluster management entirely:
- No instance selection, no spot fleet config, no bootstrap
- Pay per vCPU-second and GB-second of memory used
- Cold start: ~30 seconds vs 6–10 min for EMR on EC2
# Submit a Hive job to EMR Serverless
aws emr-serverless start-job-run \
--application-id $APP_ID \
--execution-role-arn $ROLE_ARN \
--job-driver '{
"hive": {
"query": "s3://.../01_ctas_enriched.hql",
"parameters": "-d DB=emr_labs_db"
}
}'
Trade-off: EMR Serverless has no persistent Oozie server, so Lab 06 as written requires EMR on EC2. For modern orchestration with EMR Serverless, use AWS Step Functions or MWAA (Airflow).
Instead of Oozie → Apache Airflow (MWAA)
Airflow DAGs are Python — versioned, tested, dynamic:
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
dag = DAG("covid_pipeline", schedule_interval="0 6 * * *")
create_tables = EmrAddStepsOperator(
task_id="create_tables",
job_flow_id="{{ var.value.emr_cluster_id }}",
steps=[{
"Name": "CreateTables",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["hive-script", "--run-hive-script", "--args",
"-f", "s3://.../01_create_tables.hql"]
}
}],
dag=dag
)
Airflow is the industry default for 2024+. Oozie is stable and battle-tested but no longer actively developed. If you’re building a new pipeline, use Airflow or Step Functions.
Instead of plain Parquet → Apache Iceberg
Iceberg adds ACID, time travel, hidden partitioning, and schema evolution:
-- Iceberg DDL in Athena (or Hive on EMR)
CREATE TABLE emr_labs_db.covid_enriched_ice (
date_str STRING, state STRING, county STRING,
cases BIGINT, cases_per_staffed_bed DOUBLE
)
LOCATION 's3://emr-labs-intermediate-.../covid_enriched_ice/'
TBLPROPERTIES ('table_type'='ICEBERG');
-- Time travel: query data as of 7 days ago
SELECT * FROM covid_enriched_ice
FOR SYSTEM_TIME AS OF (current_timestamp - interval '7' day);
-- ACID update (not possible with plain Parquet/Hive)
UPDATE covid_enriched_ice SET cases = 99999
WHERE county = 'New York' AND date_str = '2021-01-01';
Instead of Athena → Amazon Redshift Spectrum
Redshift Spectrum reads from S3/Glue (like Athena) but is part of Redshift — better for complex multi-table joins, VACUUM, and ANALYZE on large datasets. Use Athena for ad-hoc; use Spectrum when you need Redshift’s query planner and are already on Redshift.
Instead of Glue catalog → AWS Lake Formation
Lake Formation wraps the Glue catalog with:
- Column-level access control (e.g., hide
deathscolumn from read-only users) - Row-level filtering (e.g.,
WHERE state = 'California'enforced at catalog level) - Governed tables (ACID transactions, automatic compaction)
- Cross-account data sharing without copying data
Adoption path: start with Glue catalog (as in these labs) → layer Lake Formation on top when you need multi-team governance.
9. Cost Reference
Per-lab cost (us-east-1, 2026 pricing)
| Lab | Cluster? | Duration | Compute | Storage | Athena | Total |
|---|---|---|---|---|---|---|
| 00 | No | 2 min | $0.00 | < $0.01 | — | ~$0.01 |
| 01 | Transient | 12 min | ~$0.06 | < $0.01 | — | ~$0.07 |
| 02 | Transient | 10 min | ~$0.05 | — | — | ~$0.05 |
| 03 | Transient | 15 min | ~$0.07 | < $0.01 | — | ~$0.08 |
| 04 | Transient | 12 min | ~$0.06 | — | — | ~$0.06 |
| 05 | No | 3 min | $0.00 | < $0.01 | < $0.005 | ~$0.01 |
| 06 | Persistent | 30 min* | ~$0.15 | — | — | ~$0.15 |
*Terminate immediately after the coordinator run completes.
Total for all 7 labs: ~$0.43 – $0.60
Cost drivers to watch
# List any running EMR clusters (check after each lab)
aws emr list-clusters --active --region us-east-1 \
--query 'Clusters[].{Id:Id,Name:Name,State:Status.State}'
# S3 storage cost: check total size
aws s3 ls s3://$RAW_BUCKET --recursive --summarize | tail -2
Never leave an EMR cluster running overnight. Each m5.xlarge on-demand master costs $4.61/day. Set a CloudWatch billing alarm:
aws cloudwatch put-metric-alarm \
--alarm-name emr-labs-spend-alert \
--metric-name EstimatedCharges \
--namespace AWS/Billing \
--statistic Maximum \
--period 86400 \
--threshold 5.00 \
--comparison-operator GreaterThanThreshold \
--alarm-actions arn:aws:sns:us-east-1:ACCOUNT:your-topic
10. Observability: Results, Logs, UIs & DAGs
Every lab produces artifacts in several places simultaneously. This section explains every tool available to you — what it shows, how to open it, and how to interpret what you see. Use this as a reference while running the labs.
Tool Map: What to Use When
| Situation | Best tool |
|---|---|
| Did my EMR step succeed or fail? | AWS CLI describe-cluster / list-steps |
| What exactly went wrong in a step? | S3 step logs (stderr) |
| Why is my Hive query slow? | Tez UI → DAG detail |
| Which tasks are bottlenecking the job? | Tez UI → Task Attempts tab |
| What is the cluster doing right now? | YARN ResourceManager UI |
| How much HDFS space is used? | HDFS NameNode UI |
| Interactive Hive / HDFS browser in a UI | Hue |
| Is my Oozie workflow running? | Oozie Web UI / Oozie CLI |
| Did the Oozie coordinator trigger today? | Oozie Web UI → Coordinators |
| What query ran in Athena last hour? | Athena query history (console / CLI) |
| Do my Glue tables have the right schema? | Glue console → Tables |
| Is the cluster out of memory / CPU? | CloudWatch EMR metrics |
| Row-count / data-quality assertions | Hive CLI or Athena SQL |
EMR Step Logs (S3)
This is the first place to look after any failure. Every EMR step writes four log streams to S3 under the logs bucket:
s3://{logs-bucket}/emr-logs/{cluster-id}/steps/{step-id}/
stdout ← Hive query output, PRINT statements
stderr ← errors, stack traces, Tez task progress
controller ← EMR step runner lifecycle events
syslog ← JVM / OS-level messages
Find the step ID and stream logs (fastest path after a failure):
# Load bucket name
source shared/scripts/get_tf_outputs.sh
# List clusters and find your cluster ID
CLUSTER_ID=$(aws emr list-clusters --active --region us-east-1 \
--query 'Clusters[0].Id' --output text)
# Or use the last saved cluster ID
CLUSTER_ID=$(cat labs/lab-01-hive-basics/scripts/.last_cluster_id 2>/dev/null)
# List steps and their states
aws emr list-steps --cluster-id "$CLUSTER_ID" \
--query 'Steps[].{Id:Id,Name:Name,State:Status.State,Reason:Status.FailureDetails.Reason}'
# Get the step ID of the failed step
STEP_ID=$(aws emr list-steps --cluster-id "$CLUSTER_ID" \
--query 'Steps[?Status.State==`FAILED`].Id | [0]' --output text)
# Stream stderr (where errors and stack traces go)
aws s3 cp \
"s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/$STEP_ID/stderr.gz" - \
| gunzip | tail -100
# Stream stdout (Hive query output, SELECT results)
aws s3 cp \
"s3://$LOGS_BUCKET/emr-logs/$CLUSTER_ID/steps/$STEP_ID/stdout.gz" - \
| gunzip | head -200
Reading stderr for a Hive error: look for lines starting with FAILED:
or Error near the bottom of the file:
FAILED: SemanticException [Error 10002]: Line 3:... Invalid column reference ...
The line number in the error refers to the HQL file, not the log file.
Logs are not immediately available — EMR flushes logs to S3 every 5 minutes. If the step just finished, wait a moment before fetching. For real-time logs, SSH to the master and read YARN container logs directly (see the YARN section below).
Tez UI — DAG Visualiser
The Tez UI shows the execution graph of every Hive query that ran via Tez. It is the single best tool for understanding query performance.
Open the Tez UI (requires a running cluster):
# SSH tunnel — maps localhost:8080 → master:8080
ssh -L 8080:localhost:8080 \
-i ~/.ssh/emr-labs-key.pem \
hadoop@<MASTER_DNS> \
-N -f # -N: no command, -f: background
# Open in browser
open http://localhost:8080
The Master DNS is printed by every run_lab.sh script and also available via:
aws emr describe-cluster --cluster-id "$CLUSTER_ID" \
--query 'Cluster.MasterPublicDnsName' --output text
Navigating the Tez UI:
Tez UI home → All DAGs tab
│
└─► Select a DAG (named after the HQL file or query text)
│
├─► DAG Details
│ ├── Status: SUCCEEDED / FAILED / RUNNING
│ ├── Start/End time, Duration
│ ├── Vertices (each = one Map or Reduce stage)
│ └── Counters (rows read/written, bytes, GC time)
│
├─► Graphical View ← visual DAG with data flow arrows
│ Each box = a vertex. Hover for row/byte counts.
│ Thick arrows = large data shuffles (potential bottleneck)
│
├─► Vertex Details (click a vertex in the graph)
│ ├── Tasks: list of all parallel tasks for this vertex
│ ├── Task Attempts: shows retried/failed attempts
│ └── Counters per task (find the slow outlier task here)
│
└─► Timeline View
Gantt chart — shows which vertices ran in parallel vs serial
and where wall-clock time was actually spent
What to look for:
- Skew: one task takes 10× longer than others → data not evenly partitioned
- Spill to disk:
SPILLED_RECORDScounter > 0 → insufficient memory, increasetez.am.resource.memory.mbor add nodes - Slow vertex: look at the Timeline view — the longest bar is your bottleneck. Click it to see whether it’s Map I/O, shuffle, or Reduce
- Failed task attempts: yellow/red tasks in Vertex Details → expand to see the attempt error message
Force a query to show in Tez UI (run interactively on master):
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
hive
SET hive.execution.engine=tez;
SET tez.queue.name=default;
-- This query will appear in the Tez UI under "All DAGs"
SELECT state, COUNT(*) AS county_count, SUM(cases) AS total_cases
FROM emr_labs_db.nytimes_counties
GROUP BY state
ORDER BY total_cases DESC
LIMIT 15;
YARN ResourceManager UI
YARN manages cluster resources (CPU cores and memory) across all running jobs. The ResourceManager UI shows live application status.
# SSH tunnel
ssh -L 8088:localhost:8088 \
-i ~/.ssh/emr-labs-key.pem \
hadoop@<MASTER_DNS> \
-N -f
open http://localhost:8088
Key sections:
| Tab | What it shows |
|---|---|
| Cluster → Applications | All running, finished, and failed YARN apps (each Hive/Tez job = one app) |
| Cluster → Nodes | Each core node’s available vCores and memory |
| Application detail | Attempt history, AM logs, tracking URL (Tez UI link) |
| Scheduler | Queue utilisation (capacity, used, pending) |
CLI equivalent (useful when no tunnel is open):
# List running applications
yarn application -list 2>/dev/null # run on master via SSH
# Or from your laptop via the EMR CLI
aws emr ssh --cluster-id "$CLUSTER_ID" --key-pair-file ~/.ssh/emr-labs-key.pem \
--command "yarn application -list"
# Application logs for a specific app (replace application_XXX)
aws emr ssh --cluster-id "$CLUSTER_ID" --key-pair-file ~/.ssh/emr-labs-key.pem \
--command "yarn logs -applicationId application_1234567890_0001 | tail -200"
YARN container logs — most granular output, available in real-time during a running job (unlike S3 logs which are flushed every 5 min):
# SSH to master
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
# Find the application ID from YARN
yarn application -list -appStates ALL | grep -i hive
# Stream all container logs (large — pipe through grep or tail)
yarn logs -applicationId application_1234567890_0001 2>&1 | grep -i error | head -50
HDFS NameNode UI
HDFS NameNode tracks the distributed filesystem used by Hive for temp data, Oozie workflows, and Hive warehouse (managed tables).
ssh -L 50070:localhost:50070 \
-i ~/.ssh/emr-labs-key.pem \
hadoop@<MASTER_DNS> \
-N -f
open http://localhost:50070
What to check here:
- Overview: total capacity vs used — if > 85% full, Tez spill writes will
fail with
No space left on device - Utilities → Browse the file system: navigate to
/user/oozie/or/user/hive/warehouse/to verify that Oozie workflow files were uploaded and that any managed Hive tables exist as expected
CLI equivalent:
# Check HDFS usage
hadoop fs -df -h /
# List Oozie workflow files
hadoop fs -ls /user/oozie/emr-labs/
# Verify HQL files are on HDFS (needed by Oozie)
hadoop fs -ls /user/oozie/emr-labs/hql/
Hue — Browser IDE
Hue (Hadoop User Experience) is a web application that provides a browser-based interface for Hive, HDFS, Oozie, and more. It is available on EMR but must be added explicitly at cluster launch.
Enable Hue — add it to the --applications list in run_lab.sh:
--applications Name=Hive Name=Hadoop Name=Hue Name=Tez Name=Oozie
Access Hue (port 8888):
ssh -L 8888:localhost:8888 \
-i ~/.ssh/emr-labs-key.pem \
hadoop@<MASTER_DNS> \
-N -f
open http://localhost:8888
# Default credentials: admin / admin (change on first login)
What Hue gives you:
| Feature | Path in Hue | What you can do |
|---|---|---|
| Hive Editor | Editor → Hive | Write and run HQL interactively, see results in a table |
| Query history | Editor → My queries | Re-open past queries, see execution time |
| HDFS Browser | Files | Browse, upload, download, preview files on HDFS and S3 |
| Oozie Job Browser | Jobs → Workflows | See workflow run status, action-by-action |
| Oozie Coordinator | Jobs → Coordinators | See scheduled runs, next trigger, pause/kill |
| Table Browser | Tables | Browse Glue/Hive databases, preview sample rows |
| Metastore Manager | Tables → Database | View column schemas, partitions, statistics |
Important: Hue is a persistent-cluster feature. Transient clusters (Labs 01–05) terminate before you could use Hue. It is most useful with the Lab 06 persistent cluster.
Oozie Web UI
Oozie has its own web UI that shows workflow and coordinator status in detail.
# SSH tunnel to Oozie port
ssh -L 11000:localhost:11000 \
-i ~/.ssh/emr-labs-key.pem \
hadoop@<MASTER_DNS> \
-N -f
open http://localhost:11000/oozie
Navigating the Oozie UI:
Oozie Web Console
├── Workflow Jobs tab
│ Shows each individual workflow run:
│ Status: RUNNING / SUCCEEDED / KILLED / FAILED
│ Click a job → Actions tab → see each action (create-tables,
│ ctas-enriched, validate-output) with OK / ERROR state
│ Click an action → External ID → links to YARN app for that Hive job
│
├── Coordinator Jobs tab
│ Shows the coordinator (daily schedule)
│ Status: RUNNING / PAUSED / KILLED
│ Next Materialization: when the next run will trigger
│ Click → Jobs tab → list of each daily run instance
│
└── System Info tab
Oozie server version, build time, configuration values
Oozie CLI (more reliable than the UI for scripting):
# SSH to master first
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
OOZIE=http://localhost:11000/oozie
# List running coordinators
oozie jobs -oozie $OOZIE -jobtype coordinator -filter status=RUNNING
# Get coordinator status (replace 0000001-...-oozie-...C)
oozie job -oozie $OOZIE -info 0000001-250101000000000-oozie-oozi-C
# List the individual workflow runs spawned by the coordinator
oozie job -oozie $OOZIE -info 0000001-250101000000000-oozie-oozi-C -len 5
# Get details of a specific workflow run
oozie job -oozie $OOZIE -info 0000002-250101000000000-oozie-oozi-W
# View full log of a failed workflow
oozie job -oozie $OOZIE -log 0000002-250101000000000-oozie-oozi-W
# Rerun a failed workflow from the failed action (not from the start)
oozie job -oozie $OOZIE -rerun 0000002-250101000000000-oozie-oozi-W \
-action ctas-enriched
# Kill a running coordinator
oozie job -oozie $OOZIE -kill 0000001-250101000000000-oozie-oozi-C
# Suspend (pause) a coordinator without killing it
oozie job -oozie $OOZIE -suspend 0000001-250101000000000-oozie-oozi-C
# Resume a suspended coordinator
oozie job -oozie $OOZIE -resume 0000001-250101000000000-oozie-oozi-C
Understanding workflow action states:
| State | Meaning |
|---|---|
OK | Action completed successfully |
ERROR | Action failed — workflow moved to the KILL action |
RUNNING | Action is currently executing (Hive job in progress) |
PREP | Action is waiting for the previous action to finish |
KILLED | Workflow was manually killed before this action ran |
END | Workflow reached the END node (success) |
Hive CLI — Interactive Verification
For direct interactive queries against the cluster while it is running, SSH to the master and launch Hive or Beeline.
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS>
Option A: Hive CLI (simpler, but deprecated in favour of Beeline):
hive
-- Confirm execution engine
SET hive.execution.engine;
-- hive.execution.engine=tez
-- Switch to the lab database
USE emr_labs_db;
-- List tables registered in Glue
SHOW TABLES;
-- Verify row counts after Lab 01
SELECT COUNT(*) FROM nytimes_counties; -- expect ~1.8M rows
SELECT COUNT(*) FROM hospital_beds; -- expect ~7,500 rows
-- Spot-check a few raw rows
SELECT * FROM nytimes_counties LIMIT 5;
-- Verify enriched table after Lab 03
SELECT COUNT(*) FROM covid_enriched; -- expect ~1.5M (rows with non-null FIPS)
SELECT * FROM covid_enriched LIMIT 3;
-- Check partition/file info
DESCRIBE FORMATTED covid_enriched;
-- Look for: Location (should be s3://intermediate-bucket/...)
-- Look for: InputFormat (should be ParquetInputFormat)
-- Validate no nulls in key columns
SELECT
COUNT(*) AS total_rows,
COUNT(CASE WHEN fips IS NULL THEN 1 END) AS null_fips,
COUNT(CASE WHEN cases < 0 THEN 1 END) AS negative_cases,
MIN(date_str), MAX(date_str)
FROM covid_enriched;
-- Check join quality: what % of rows matched a hospital record?
SELECT
COUNT(*) AS total,
SUM(CASE WHEN num_staffed_beds IS NOT NULL THEN 1 ELSE 0 END) AS matched,
ROUND(100.0 * SUM(CASE WHEN num_staffed_beds IS NOT NULL THEN 1 ELSE 0 END)
/ COUNT(*), 2) AS match_pct
FROM covid_enriched;
-- Expect: ~65–75% match rate (rural counties have no hospital record)
Option B: Beeline (JDBC client, connects to HiveServer2, recommended):
beeline -u jdbc:hive2://localhost:10000 -n hadoop
Beeline shows query progress (% complete) in the terminal and is the production-grade Hive client used by most BI tools and Hue.
-- Beeline has the same SQL dialect as the Hive CLI
-- Progress bar appears during execution:
-- INFO : DAG Status: Status=RUNNING, Progress=50.00%
SELECT state, SUM(cases) AS total FROM emr_labs_db.nytimes_counties
GROUP BY state ORDER BY total DESC LIMIT 5;
Exit: type quit; or press Ctrl+D.
Athena Query History & Results
Athena stores every query execution (success and failure) for 45 days.
AWS Console:
- Open https://console.aws.amazon.com/athena/home?region=us-east-1
- Click Query editor → History tab
- Each row: query text (truncated), state, run time, data scanned, timestamp
- Click a row → View details → full query text, execution ID, output S3 location
- Click the S3 output location to download the CSV result file
AWS CLI:
# List recent query executions
aws athena list-query-executions \
--work-group primary \
--query 'QueryExecutionIds[:10]' --output text
# Get status and stats for a specific execution
QID=<paste-execution-id-here>
aws athena get-query-execution --query-execution-id "$QID" \
--query 'QueryExecution.{State:Status.State,
Reason:Status.StateChangeReason,
Scanned:Statistics.DataScannedInBytes,
Runtime:Statistics.TotalExecutionTimeInMillis,
Output:ResultConfiguration.OutputLocation}'
# Stream the result CSV directly
OUTPUT=$(aws athena get-query-execution --query-execution-id "$QID" \
--query 'QueryExecution.ResultConfiguration.OutputLocation' --output text)
aws s3 cp "$OUTPUT" -
# Paginate results via the API (no S3 download)
aws athena get-query-results \
--query-execution-id "$QID" \
--query 'ResultSet.Rows[*].Data[*].VarCharValue'
Key Athena stats to check:
DataScannedInBytes: if this equals the full file size, Parquet predicate push-down is NOT working — check that the WHERE clause uses a partitioned column or that statistics existTotalExecutionTimeInMillis: wall-clock time including queue wait; subtractQueryQueueTimeInMillisto get pure execution timeStateChangeReason: whenState=FAILED, this contains the SQL error message
Athena workgroup settings — check the result output location:
aws athena get-work-group --work-group primary \
--query 'WorkGroup.Configuration.ResultConfiguration.OutputLocation'
Output should be s3://emr-labs-athena-{account}-us-east-1/.
Glue Console — Schema & Data Preview
The Glue console is the canonical view of what table definitions exist in your metastore. Use it to validate that Lab 01 and Lab 03 wrote schemas correctly.
In the AWS Console:
- Go to https://console.aws.amazon.com/glue/home?region=us-east-1
- Data Catalog → Databases → click
emr_labs_db - Click any table name → Schema tab → verify column names, types, SerDe
- Properties tab →
Location(S3 path),InputFormat,Classification - Click Action → View data → opens Athena pre-loaded with a
SELECT * LIMIT 10
AWS CLI:
# List all tables in the lab database
aws glue get-tables --database-name emr_labs_db \
--query 'TableList[].{Name:Name,
Location:StorageDescriptor.Location,
Format:StorageDescriptor.InputFormat,
Cols:StorageDescriptor.Columns[*].Name}'
# Inspect a single table's full definition
aws glue get-table \
--database-name emr_labs_db \
--name covid_enriched \
| jq '.Table.StorageDescriptor | {Location, InputFormat, SerdeInfo}'
# Check table statistics (populated after ANALYZE TABLE or a Glue Crawler run)
aws glue get-column-statistics-for-table \
--database-name emr_labs_db \
--table-name covid_enriched \
--column-names cases deaths fips 2>/dev/null
What correct output looks like for covid_enriched after Lab 03:
{
"Location": "s3://emr-labs-intermediate-.../covid_enriched/",
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
}
}
If InputFormat still shows TextInputFormat, the CTAS did not write Parquet —
check that the CTAS step completed without errors.
CloudWatch — Cluster Metrics & Alarms
EMR automatically publishes ~50 metrics to CloudWatch under the AWS/EMR
namespace. No agent or configuration is required.
Useful EMR metrics:
| Metric | What it measures | Alert threshold |
|---|---|---|
YARNMemoryAvailablePercentage | % of cluster memory not yet allocated | < 15% → add nodes |
ContainerAllocated | Running YARN containers | Spike = new job started |
HDFSUtilization | % of HDFS disk used | > 85% → risk of spill failures |
MRActiveNodes | Active core/task nodes | Drop = node failure |
S3BytesRead | Bytes read from S3 per second | Useful for estimating job I/O |
S3BytesWritten | Bytes written to S3 per second | Spike = CTAS or output write |
View metrics in the console:
- Go to https://console.aws.amazon.com/cloudwatch/home?region=us-east-1
- Metrics → All metrics → AWS/EMR
- Select your cluster by
JobFlowId - Add to a graph:
YARNMemoryAvailablePercentage+ContainerAllocated
CLI — fetch a metric for the last 30 minutes:
CLUSTER_ID=$(cat labs/lab-03-transformations/scripts/.last_cluster_id)
aws cloudwatch get-metric-statistics \
--namespace AWS/EMR \
--metric-name YARNMemoryAvailablePercentage \
--dimensions Name=JobFlowId,Value="$CLUSTER_ID" \
--start-time $(date -u -v-30M +%FT%TZ) \
--end-time $(date -u +%FT%TZ) \
--period 60 \
--statistics Average \
--query 'sort_by(Datapoints, &Timestamp)[*].{T:Timestamp,Avg:Average}' \
--output table
Create an alarm — alert when memory is critically low:
aws cloudwatch put-metric-alarm \
--alarm-name emr-low-memory-$CLUSTER_ID \
--namespace AWS/EMR \
--metric-name YARNMemoryAvailablePercentage \
--dimensions Name=JobFlowId,Value="$CLUSTER_ID" \
--statistic Average \
--period 120 \
--threshold 10 \
--comparison-operator LessThanThreshold \
--evaluation-periods 2 \
--alarm-description "EMR cluster low on YARN memory" \
--treat-missing-data notBreaching
Reading & Validating Results
After each lab, run these checks to confirm the pipeline produced correct output.
Lab 01 — External tables created
# Tables must exist in Glue
aws glue get-tables --database-name emr_labs_db \
--query 'TableList[].Name' --output text
# Expected: hospital_beds nytimes_counties
# S3 data must be untouched (external table DROP test)
aws s3 ls "s3://$RAW_BUCKET/covid/nytimes/us-counties/csv/" | wc -l
# Expected: > 0 files
Lab 03 — Parquet enriched table
# Parquet files written to intermediate bucket
aws s3 ls "s3://$INTERMEDIATE_BUCKET/covid_enriched/" --recursive | wc -l
# Expected: several .parquet files (exact count varies by parallelism)
# Row count via Athena (fast, no cluster needed)
aws athena start-query-execution \
--query-string "SELECT COUNT(*) FROM emr_labs_db.covid_enriched" \
--work-group primary \
--result-configuration "OutputLocation=s3://$ATHENA_BUCKET/validation/" \
--query 'QueryExecutionId' --output text
# Then fetch result as shown in the Athena section above
# Expected: ~1,500,000 – 1,800,000 rows depending on dataset version
# Confirm format is Parquet via Glue
aws glue get-table --database-name emr_labs_db --name covid_enriched \
--query 'Table.StorageDescriptor.InputFormat' --output text
# Expected: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
Lab 05 — Athena view and window function results
# Verify the view exists
aws glue get-table --database-name emr_labs_db \
--name latest_county_snapshot 2>&1 | grep -c TableName
# Expected: 1
# Check the trend query produced output (non-empty result file)
aws s3 ls "s3://$ATHENA_BUCKET/lab05-results/" \
--recursive --human-readable | grep -v metadata
# At least 3 output files (one per SQL statement)
Lab 06 — Oozie coordinator triggered
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS> \
"oozie jobs -oozie http://localhost:11000/oozie \
-jobtype coordinator -filter status=RUNNING"
# Expected: one coordinator row with your application path
# Confirm at least one workflow run reached END state
ssh -i ~/.ssh/emr-labs-key.pem hadoop@<MASTER_DNS> \
"oozie job -oozie http://localhost:11000/oozie \
-info 0000001-...-oozie-oozi-C -len 3"
# Look for Status=SUCCEEDED in the Actions column
General data quality pattern
-- Run in Hive CLI or Athena after any transformation
SELECT
'total_rows' AS check_name, CAST(COUNT(*) AS STRING) AS result
FROM emr_labs_db.covid_enriched
UNION ALL
SELECT 'null_fips', CAST(SUM(CASE WHEN fips IS NULL THEN 1 ELSE 0 END) AS STRING)
FROM emr_labs_db.covid_enriched
UNION ALL
SELECT 'negative_cases', CAST(SUM(CASE WHEN cases < 0 THEN 1 ELSE 0 END) AS STRING)
FROM emr_labs_db.covid_enriched
UNION ALL
SELECT 'date_range',
CONCAT(MIN(date_str), ' → ', MAX(date_str))
FROM emr_labs_db.covid_enriched;
Quick CLI Reference Card
Keep these commands handy while working through the labs.
# ── Cluster status ─────────────────────────────────────────────────────────
aws emr describe-cluster --cluster-id $CID \
--query 'Cluster.{State:Status.State,Master:MasterPublicDnsName}'
aws emr list-steps --cluster-id $CID \
--query 'Steps[].{Name:Name,State:Status.State,Id:Id}'
# ── Step logs ──────────────────────────────────────────────────────────────
aws s3 cp s3://$LOGS_BUCKET/emr-logs/$CID/steps/$STEP/stderr.gz - | gunzip | tail -80
aws s3 cp s3://$LOGS_BUCKET/emr-logs/$CID/steps/$STEP/stdout.gz - | gunzip | head -100
# ── SSH tunnels (run each in a separate terminal, Ctrl+C to close) ─────────
ssh -L 8080:localhost:8080 -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N # Tez UI
ssh -L 8088:localhost:8088 -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N # YARN
ssh -L 50070:localhost:50070 -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N # HDFS
ssh -L 8888:localhost:8888 -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N # Hue
ssh -L 11000:localhost:11000 -i ~/.ssh/emr-labs-key.pem hadoop@$MASTER -N # Oozie
# ── Glue ───────────────────────────────────────────────────────────────────
aws glue get-tables --database-name emr_labs_db --query 'TableList[].Name'
aws glue get-table --database-name emr_labs_db --name <TABLE> | jq '.Table.StorageDescriptor'
# ── Athena ─────────────────────────────────────────────────────────────────
aws athena list-query-executions --work-group primary --query 'QueryExecutionIds[:5]'
aws athena get-query-execution --query-execution-id $QID \
--query 'QueryExecution.{State:Status.State,Scanned:Statistics.DataScannedInBytes}'
# ── S3 data checks ─────────────────────────────────────────────────────────
aws s3 ls s3://$RAW_BUCKET/covid/ --recursive --human-readable | tail -5
aws s3 ls s3://$INTERMEDIATE_BUCKET/ --recursive --human-readable | tail -5
aws s3 ls s3://$LOGS_BUCKET/emr-logs/$CID/ --recursive | wc -l
# ── YARN (run on master via SSH) ───────────────────────────────────────────
yarn application -list -appStates ALL
yarn logs -applicationId application_XXXXXXXX_XXXX 2>&1 | grep -i 'error\|fatal' | head -30
# ── Oozie (run on master via SSH) ──────────────────────────────────────────
OOZIE=http://localhost:11000/oozie
oozie jobs -oozie $OOZIE -jobtype coordinator -filter status=RUNNING
oozie job -oozie $OOZIE -info <COORD_ID>
oozie job -oozie $OOZIE -log <WF_ID>
oozie job -oozie $OOZIE -rerun <WF_ID> -action <ACTION_NAME>
11. Production Monitoring & Troubleshooting: Lessons from Large Companies
The following section draws on published engineering blog posts, open-source tools, and conference talks (Spark Summit, QCon, re:Invent, VLDB) from companies that operate data platforms at the scale of billions of events per day. Each subsection identifies the core problem they faced, the technical solution they built or adopted, and the concrete practice you can apply in your own pipelines (including these labs).
Netflix — Instrument Everything, Trust Nothing
Background
Netflix processes ~700 billion events per day across its data platform
(2024 figures from their tech blog). At that scale, a silent data-quality
bug — a column that starts emitting NULL instead of a user ID — can corrupt
weeks of A/B test results before anyone notices.
Problem 1: Silent failures in distributed jobs
A Spark/Hive job exits with code 0 (success) but writes 0 rows because a filter predicate silently excluded everything. The job succeeded; the data did not.
Netflix’s solution — output record count assertions:
Every job emits a metric pipeline.output_records{job=X, date=Y}. A
separate anomaly-detection job compares today’s count to a rolling 14-day
median. If today’s count deviates by more than 3σ, PagerDuty fires.
Apply this now:
# After Lab 03 CTAS, assert minimum row count before the cluster terminates
# Add this as a final step in run_lab.sh:
ROW_COUNT=$(aws athena start-query-execution \
--query-string "SELECT COUNT(*) FROM emr_labs_db.covid_enriched" \
--work-group primary \
--result-configuration "OutputLocation=s3://$ATHENA_BUCKET/qa/" \
--output text --query QueryExecutionId)
# ... wait for completion ...
COUNT=$(aws athena get-query-results --query-execution-id $ROW_COUNT \
--query 'ResultSet.Rows[1].Data[0].VarCharValue' --output text)
if [ "$COUNT" -lt 1000000 ]; then
echo "ASSERTION FAILED: expected >= 1M rows, got $COUNT" >&2
exit 1
fi
Problem 2: Metastore as a single point of failure
The original Netflix pipeline used a shared Hive Metastore (MySQL-backed). A schema migration on the metastore database caused 6 hours of pipeline downtime across 300+ Spark jobs that all connected to the same instance.
Netflix’s solution — Metacat: Metacat is a federated metadata service that presents a single API over multiple underlying metastores (Hive, Glue, Druid, Elasticsearch). It adds:
- Partition pruning cache: avoids hammering the metastore with
SHOW PARTITIONSfor every job start - Metadata tagging: annotate tables with
owner,sla,sensitivity(PII/non-PII) as searchable tags - Change events: every table schema change emits a Kafka event so downstream consumers can react
Lesson for AWS: use Glue (as these labs do) rather than a self-managed Hive Metastore. Glue is multi-AZ, serverless, and does not become a capacity bottleneck. Add Glue tags for ownership:
aws glue tag-resource \
--resource-arn arn:aws:glue:us-east-1:ACCOUNT:table/emr_labs_db/covid_enriched \
--tags-to-add owner=data-eng,sla=daily-06:00,sensitivity=public
Problem 3: Slow query regression
A schema change (adding a new column) caused a previously 2-minute query to take 45 minutes because the new column triggered a different join strategy.
Netflix’s solution — query fingerprinting + performance baselining: Every query is hashed to a fingerprint (normalized, stripping literal values). Historical execution time per fingerprint is stored in an internal time-series DB. An SLA checker flags any fingerprint whose p95 runtime exceeds 2× its 30-day median.
Apply this with Athena:
# Log Athena query stats to a metrics file after every run
STATS=$(aws athena get-query-execution --query-execution-id $QID \
--query 'QueryExecution.Statistics')
echo "$(date -u +%FT%TZ) qid=$QID \
runtime=$(echo $STATS | jq .TotalExecutionTimeInMillis) \
scanned=$(echo $STATS | jq .DataScannedInBytes)" \
>> .athena_query_stats.log
# Alert if runtime exceeds 3× historical average for the same query
# (implement with a simple awk/python check on the log file)
Uber — Data Quality as a First-Class Citizen
Background
Uber’s data platform (built around Hudi + Presto + Spark) processes ride, earnings, and fraud signals. In 2019 they published their experience with silent data corruption — jobs that ran cleanly but produced wrong results due to upstream schema drift.
Problem 1: Schema drift at the source
A backend team renamed a JSON field from driver_id to driverId (camelCase
migration). All downstream Hive jobs silently started getting NULL for
driver_id because the schema was never updated. The bug was discovered 11
days later by a product analyst noticing anomalous metrics.
Uber’s solution — Schemaless + schema registry enforcement:
- All events must be registered in a schema registry before any consumer can read them
- Schema evolution rules are enforced at the producer: only backward- compatible changes allowed (add optional fields, never rename or remove)
- For Hive/Glue: use
TBLPROPERTIES ('schema.evolution'='true')with Hudi or Iceberg and enable the schema compatibility checker
Apply this with Glue schema registry:
# Create a schema registry for event schemas
aws glue create-registry --registry-name emr-labs-registry
# Register a schema version — rejects incompatible changes
aws glue create-schema \
--registry-id RegistryName=emr-labs-registry \
--schema-name county-cases \
--data-format AVRO \
--compatibility BACKWARD \
--schema-definition file://schemas/county_cases.avsc
Problem 2: Duplicates from exactly-once failures
When an ingestion job failed mid-run and was retried, some partitions were
written twice. Hive INSERT OVERWRITE is idempotent; INSERT INTO is not.
Downstream COUNT queries returned inflated figures.
Uber’s solution — Hudi upsert semantics:
Hudi (Hadoop Upserts Deletes and Incrementals) uses a record key +
partition path combination to guarantee exactly-once semantics even on
retry. It maintains a .hoodie/ commit timeline log. If a job is retried,
Hudi detects that the commit already exists and skips the write.
Apply this for idempotent Hive jobs (without Hudi):
-- Use INSERT OVERWRITE (not INSERT INTO) for idempotency
INSERT OVERWRITE TABLE emr_labs_db.covid_enriched
PARTITION (state)
SELECT ...
FROM source_table
WHERE date_str = '${DATE}';
-- Retrying this is safe: the partition is overwritten, not appended
Problem 3: Data freshness SLA violations
A daily pipeline was supposed to complete by 08:00 UTC. Intermittent Spot instance reclamation caused it to finish at 11:30 UTC with no alert fired because the job did eventually succeed.
Uber’s solution — freshness SLA metrics:
pipeline.completion_delay_minutes{job=X, date=Y}
If completion_delay_minutes > 120, PagerDuty fires even if the job
ultimately succeeded.
Apply this with CloudWatch:
# Emit a custom metric for pipeline completion time
START_EPOCH=<record job start time>
END_EPOCH=$(date +%s)
DELAY_MIN=$(( (END_EPOCH - START_EPOCH) / 60 ))
aws cloudwatch put-metric-data \
--namespace EMRLabs/Pipelines \
--metric-name CompletionDelayMinutes \
--dimensions JobName=lab03-ctas,Date=$(date +%F) \
--value "$DELAY_MIN" \
--unit Count
# Alarm: alert if completion takes > 45 minutes
aws cloudwatch put-metric-alarm \
--alarm-name lab03-sla-breach \
--namespace EMRLabs/Pipelines \
--metric-name CompletionDelayMinutes \
--dimensions Name=JobName,Value=lab03-ctas \
--statistic Maximum \
--period 300 \
--threshold 45 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1
LinkedIn — The Origin of Metadata-Driven Monitoring
Background
LinkedIn is where many foundational big data tools were born: Kafka, Samza, Azkaban (workflow scheduler), and Apache Atlas (data lineage). Their data team wrote the 2012 paper “Building LinkedIn’s Real-time Activity Data Pipeline” which influenced how the industry thinks about data observability.
Problem 1: You can’t monitor what you can’t trace
When a dashboard showed wrong numbers, the analyst knew which table was wrong but had no way to know which upstream job wrote to it, which source that job read from, or when the last successful run was.
LinkedIn’s solution — Apache Atlas (data lineage): Atlas captures read/write relationships between datasets at the column level. When you trace a metric backwards, you get a DAG showing every transformation it passed through.
AWS equivalent — Glue lineage via DataBrew or Lake Formation:
# Enable column-level lineage tracking in Lake Formation
aws lakeformation put-data-lake-settings --data-lake-settings '{
"DataLakeAdmins": [{"DataLakePrincipalIdentifier": "arn:aws:iam::ACCOUNT:role/emr-service"}],
"CreateTableDefaultPermissions": []
}'
# Query Glue Data Catalog for table-level lineage hints via tags
aws glue get-tables --database-name emr_labs_db \
--query 'TableList[].{Name:Name,Tags:Parameters}'
For full column-level lineage, use OpenLineage (an open standard supported by Airflow, Spark, and dbt):
# In a PySpark job, emit lineage events automatically
from openlineage.client import OpenLineageClient
# Marquez (open-source) or Atlan collects these events
# and renders a lineage graph
Problem 2: Job dependency explosion
With 3,000+ daily Hive/Spark jobs, a single upstream delay cascades through dependencies causing 50+ downstream jobs to be late. Without a dependency graph, the on-call engineer cannot identify the root cause quickly.
LinkedIn’s solution — Azkaban DAG + SLA waterfall: Azkaban (LinkedIn’s open-source workflow scheduler, predating Airflow) stores the full dependency graph. When a job is late, it shows exactly which upstream job is blocking it and for how long.
Concrete diagnostic pattern (applicable to Oozie and any scheduler):
When an alert fires:
1. Identify the failed/delayed job (the leaf symptom)
2. Walk the dependency graph UPSTREAM to find the root cause job
3. Check that root job's last run time and error
4. Fix the root cause first — fixing the leaf job will fail again
if the upstream data is still wrong or missing
Problem 3: Runaway jobs consuming all cluster capacity
A single mis-written GROUP BY query with no LIMIT read 2 TB of data, consumed 95% of cluster memory for 6 hours, and starved all other jobs.
LinkedIn’s solution — YARN capacity scheduler + resource quotas:
# Add per-user YARN queue with max capacity
# In yarn-site.xml (add as EMR configuration at cluster launch):
{
"Classification": "yarn-site",
"Properties": {
"yarn.scheduler.capacity.root.queues": "default,batch,adhoc",
"yarn.scheduler.capacity.root.batch.capacity": "70",
"yarn.scheduler.capacity.root.adhoc.capacity": "30",
"yarn.scheduler.capacity.root.adhoc.maximum-capacity": "30",
"yarn.scheduler.capacity.root.adhoc.user-limit-factor": "1"
}
}
-- Set per-query resource limits in Hive before running ad-hoc queries
SET hive.exec.reducers.max=20; -- cap parallelism
SET mapreduce.map.memory.mb=2048; -- cap per-task memory
SET tez.am.resource.memory.mb=4096; -- cap Tez AM
SET hive.limit.pushdown.safe=true; -- push LIMIT into map phase
Airbnb — Lineage, SLAs and Incident Playbooks
Background
Airbnb’s data engineering team (the inventors of Apache Airflow) has published extensively on their “Minerva” metrics platform and “Dataportal” data discovery tool. Their monitoring philosophy centres on three questions: Is the data fresh? Is it complete? Is it correct?
Problem 1: Different teams defining the same metric differently
“Active listings” was defined differently by the Growth team (any listing created in the last 30 days), the Payments team (any listing with a booking in the last 30 days), and the Supply team (any listing that is currently published). Dashboards showed three different numbers for the same metric.
Airbnb’s solution — Minerva (single metric store): Minerva is a company-wide metric definition layer. Metrics are defined once in YAML and computed by a central pipeline. Every dashboard, report, and experiment uses the same computed value.
Apply this pattern:
-- Define metrics in a single Hive/Athena view that all consumers query
-- Instead of letting each team write their own COUNT:
CREATE OR REPLACE VIEW emr_labs_db.metrics_county_stress AS
SELECT
date_str,
state,
-- Metric definition is here, not in each consumer's query
SUM(cases) AS cumulative_cases,
SUM(deaths) AS cumulative_deaths,
SUM(num_staffed_beds) AS total_staffed_beds,
ROUND(SUM(cases) / NULLIF(SUM(num_staffed_beds), 0), 4) AS cases_per_bed,
ROUND(SUM(deaths) / NULLIF(SUM(num_icu_beds), 0), 4) AS deaths_per_icu_bed
FROM emr_labs_db.covid_enriched
GROUP BY date_str, state;
-- All downstream consumers join this view, not the raw table
Problem 2: No way to know when the data was last updated
An analyst queried a table, got results, and built a report — without knowing that the pipeline had been broken for 5 days and the data was stale.
Airbnb’s solution — freshness metadata in the table itself:
-- Add a pipeline metadata table
CREATE TABLE IF NOT EXISTS emr_labs_db.pipeline_runs (
pipeline_name STRING,
run_date STRING,
status STRING, -- STARTED / SUCCEEDED / FAILED
rows_written BIGINT,
duration_sec INT,
cluster_id STRING,
ts TIMESTAMP
)
STORED AS PARQUET
LOCATION 's3://${PROCESSED_BUCKET}/pipeline_runs/';
-- Insert a record at the START and END of every pipeline run
-- (from your run_lab.sh shell scripts):
# START:
hive -e "INSERT INTO emr_labs_db.pipeline_runs VALUES \
('lab03-ctas','$(date +%F)','STARTED',0,0,'$CLUSTER_ID',current_timestamp)"
# END (after step completes):
hive -e "INSERT INTO emr_labs_db.pipeline_runs VALUES \
('lab03-ctas','$(date +%F)','SUCCEEDED',$ROW_COUNT,$DURATION,'$CLUSTER_ID',current_timestamp)"
-- Any analyst can now check freshness:
SELECT pipeline_name, MAX(run_date) AS last_successful_run,
MAX(ts) AS last_run_ts
FROM emr_labs_db.pipeline_runs
WHERE status = 'SUCCEEDED'
GROUP BY pipeline_name;
Problem 3: Cascading incident with no runbook
When the daily pipeline failed at 03:00 UTC, the on-call engineer spent 90 minutes finding the error, understanding the impact, and deciding whether to re-run, roll back, or skip. The same incident had happened 4 months earlier but the knowledge was in a Slack thread.
Airbnb’s solution — runbook-as-code in the DAG:
Every Airflow DAG definition includes a doc_md field with:
- What the pipeline does
- Upstream dependencies
- Downstream consumers
- How to re-run it safely
- Known failure modes and their fixes
See the Runbook Template subsection below.
Meta (Facebook) — Scuba and Real-Time Drill-Down
Background
Meta’s data infrastructure team published the Scuba paper (VLDB 2013): “Scuba: Diving into Data at Facebook”. Scuba is an in-memory time-series store that ingests ~1 million events/sec and allows engineers to drill down from a high-level metric anomaly to the specific server, user cohort, or feature flag causing it — in under 10 seconds.
Key techniques applicable to EMR/Hive pipelines
Technique 1 — Stratified sampling for fast debugging
Meta engineers don’t wait for a full 6-hour Hive job to reproduce a bug. They run a 0.1% sample first:
-- Fast debugging run: 0.1% sample using Hive's TABLESAMPLE
SELECT state, COUNT(*), SUM(cases)
FROM emr_labs_db.nytimes_counties
TABLESAMPLE (0.1 PERCENT)
GROUP BY state;
-- Runs in seconds on a small cluster. Confirms logic before running at scale.
-- Or sample by bucket (deterministic, reproducible):
SELECT *
FROM emr_labs_db.nytimes_counties
TABLESAMPLE (BUCKET 1 OUT OF 100 ON fips)
LIMIT 1000;
-- The same 1% is returned every time — useful for regression testing
Technique 2 — Structured logs with consistent fields
Meta mandates that every log line is valid JSON with a fixed set of fields:
{"ts": "2025-01-01T06:00:00Z", "job": "lab03-ctas", "level": "INFO",
"event": "step_complete", "step": "ctas-enriched",
"rows_written": 1543210, "duration_ms": 187432, "cluster_id": "j-XXX"}
This makes logs grep-able, dashboardable (ingest into CloudWatch Logs
Insights), and alertable (filter on level=ERROR).
Apply this in shell scripts:
log() {
local level=$1; shift
printf '{"ts":"%s","job":"%s","level":"%s","msg":"%s"}\n' \
"$(date -u +%FT%TZ)" "${JOB_NAME:-unknown}" "$level" "$*"
}
log INFO "Starting CTAS step"
log INFO "Cluster launched" cluster_id="$CLUSTER_ID"
log ERROR "Step failed" step="$STEP_ID" reason="$REASON" # triggers alert
Technique 3 — Canary partitions
Before writing the full output, write one partition (e.g., one state) and validate it. If the canary partition looks wrong, abort before writing 100%.
-- Canary: run the CTAS for a single state first
CREATE TABLE emr_labs_db.covid_enriched_canary
STORED AS PARQUET
LOCATION 's3://${INTERMEDIATE_BUCKET}/covid_enriched_canary/'
AS
SELECT ... FROM ... WHERE n.state = 'New York';
-- Validate the canary
SELECT COUNT(*), AVG(cases_per_staffed_bed), MAX(cases)
FROM emr_labs_db.covid_enriched_canary;
-- If numbers look reasonable, run the full CTAS
-- If not, DROP TABLE covid_enriched_canary and fix the query
Databricks — Structured Logging and Delta Log Auditing
Background
Databricks open-sourced Delta Lake in 2019 and has since published extensively
on reliability patterns for lakehouse architectures. Their key contribution
to the monitoring space is the Delta transaction log — a write-ahead log
(_delta_log/) that records every operation on a table.
Problem 1: “What changed and when?”
With plain Parquet/Hive, there is no record of when a partition was overwritten or by which job. If corrupt data is discovered 3 days later, you cannot determine exactly when it was introduced.
Databricks’s solution — Delta transaction log (ACID audit trail):
Every INSERT, UPDATE, DELETE, MERGE, and OPTIMIZE on a Delta table
appends a JSON entry to _delta_log/0000000000000000N.json:
{"commitInfo": {
"timestamp": 1704067200000,
"operation": "WRITE",
"operationParameters": {"mode": "Overwrite"},
"userMetadata": "job=lab03-ctas,cluster=j-XXX"
}}
You can audit this from Athena or Spark:
-- Delta table history (in Spark / Databricks SQL)
DESCRIBE HISTORY covid_enriched;
-- Shows: version, timestamp, operation, operationParameters, userMetadata
-- Time travel to before the bad write
SELECT * FROM covid_enriched VERSION AS OF 42;
SELECT * FROM covid_enriched TIMESTAMP AS OF '2025-01-10 06:00:00';
Apply this with Glue table properties (without Delta):
# Stamp every pipeline run's metadata onto the Glue table
aws glue update-table \
--database-name emr_labs_db \
--table-input "$(aws glue get-table \
--database-name emr_labs_db --name covid_enriched \
--query 'Table' | \
jq '.Parameters["last_updated"] = "'$(date -u +%FT%TZ)'" |
.Parameters["last_job"] = "lab03-ctas" |
.Parameters["last_cluster"]= "'$CLUSTER_ID'" |
del(.DatabaseName,.CreateTime,.UpdateTime,.IsRegisteredWithLakeFormation,
.CatalogId,.VersionId,.IsMultiDialectView)'
)"
# Read the metadata back
aws glue get-table --database-name emr_labs_db --name covid_enriched \
--query 'Table.Parameters.{last_updated:last_updated,last_job:last_job}'
Problem 2: Small files degrading query performance over time
Every incremental Hive job appends new files to a partition. After 6 months,
a partition has 10,000 files of 1 MB each instead of 10 files of 1 GB each.
Every Athena query spends most of its time on S3 LIST and file-open overhead.
Databricks’s solution — OPTIMIZE (auto-compaction):
Delta’s OPTIMIZE command rewrites small files into large ones in the
background. Delta also has Auto Optimize and Auto Compaction that
trigger automatically.
Apply this with plain Parquet + Hive (manual compaction):
-- Compact a partition by rewriting it via INSERT OVERWRITE
-- Run this as a weekly maintenance job
INSERT OVERWRITE TABLE emr_labs_db.covid_enriched
PARTITION (state = 'New York')
SELECT date_str, county, fips, cases, deaths,
num_staffed_beds, cases_per_staffed_bed
FROM emr_labs_db.covid_enriched
WHERE state = 'New York';
-- Hive reads all small files in the partition and writes one new Parquet file
# Detect partitions with many small files (run from Athena)
SELECT partition_key, file_count, avg_size_mb
FROM (
SELECT '$partition' AS partition_key,
COUNT(*) AS file_count,
ROUND(AVG(file_size) / 1048576, 2) AS avg_size_mb
FROM information_schema.partitions -- Athena metadata
WHERE table_schema = 'emr_labs_db'
AND table_name = 'covid_enriched'
) t
WHERE file_count > 100 OR avg_size_mb < 10;
Problem 3: Concurrent writers corrupting data
Two Airflow tasks writing to the same Hive table partition simultaneously (due to a retry race condition) resulted in partial writes: each job wrote half the partition’s files, and both believed they succeeded.
Databricks’s solution — optimistic concurrency control in Delta:
Delta’s transaction log provides serializable isolation. If two writers
attempt to overwrite the same partition concurrently, one wins and the other
receives a ConcurrentAppendException and must retry.
Apply this pattern with Hive (no Delta):
# Use S3 conditional writes or a DynamoDB lock table for mutual exclusion
# Simple advisory lock pattern:
LOCK_KEY="emr_labs_db/covid_enriched/state=NY"
aws dynamodb put-item \
--table-name pipeline-locks \
--item "{\"lock_key\": {\"S\": \"$LOCK_KEY\"},
\"owner\": {\"S\": \"$CLUSTER_ID\"},
\"ttl\": {\"N\": \"$(( $(date +%s) + 3600 ))\"}}"\
--condition-expression "attribute_not_exists(lock_key)" \
--return-values NONE 2>&1
# If this fails with ConditionalCheckFailedException, another job owns the lock
Synthesised Best Practices You Can Apply Now
The following table distils the above lessons into practices you can apply directly to these labs and to any Hive/EMR pipeline you build.
Monitoring
| Practice | Implementation | Source |
|---|---|---|
| Assert output row count after every job | CloudWatch custom metric + alarm | Netflix |
| Emit pipeline completion delay as a metric | put-metric-data CompletionDelayMinutes | Uber |
| Write pipeline run metadata to a Glue table | pipeline_runs table with status/rows/duration | Airbnb |
| Tag Glue tables with owner, SLA, sensitivity | aws glue tag-resource | Netflix/Metacat |
Stamp last_updated/last_job on Glue table Parameters | aws glue update-table | Databricks |
| Structured JSON logs in all shell scripts | log() function with ts/job/level/msg | Meta |
| CloudWatch dashboard with YARN memory + S3 bytes | CloudWatch metrics AWS/EMR namespace | all |
Troubleshooting
| Practice | Implementation | Source |
|---|---|---|
| Reproduce bugs on 0.1% sample before full rerun | TABLESAMPLE (0.1 PERCENT) | Meta |
| Walk dependency graph upstream to find root cause | Oozie/Airflow DAG view | |
| Use canary partition before writing full output | Single-state CTAS, validate, then full CTAS | Meta |
| Check stderr log first, not stdout | s3 cp .../stderr.gz - | gunzip | tail | all |
| Preserve failed partition data before retry | Never DROP TABLE before investigating | Uber |
| Distinguish job success from data correctness | Row count assertion step after every write | Netflix |
Reliability
| Practice | Implementation | Source |
|---|---|---|
Use INSERT OVERWRITE not INSERT INTO | Idempotent partition writes | Uber |
| Detect and compact small files weekly | INSERT OVERWRITE partition compaction job | Databricks |
| Define metrics once in a shared view | metrics_* Glue view, not per-consumer query | Airbnb/Minerva |
| Enforce schema compatibility at registration | Glue Schema Registry BACKWARD compat | Uber |
| Use YARN queue capacity limits for ad-hoc queries | adhoc queue capped at 30% capacity | |
| Maintain a runbook per pipeline (see below) | Markdown in DAG or wiki, link from alert | Airbnb |
Failure Mode Taxonomy
When something goes wrong, categorise the failure before acting. The fix depends on the failure mode, not just the symptom.
Failure Mode Symptom Typical Root Cause
─────────────────────────────────────────────────────────────────────────
Silent zero output Job succeeds, 0 rows Wrong filter, missing partition,
written schema mismatch on join key
Partial write Row count lower than Spot instance reclamation
expected, no error mid-write, OOM kill
Schema drift NULL values in column Upstream renamed a field,
that was non-null before or changed delimiter
Data duplication Row count 2× expected INSERT INTO instead of
INSERT OVERWRITE on retry
SLA miss (late but OK) Job completes after Spot capacity shortage,
deadline, no failure cluster bootstrap slow
Cascading delay 10+ downstream jobs late Single upstream job blocked
on data that never arrived
Skew / hot partition 1 reducer task runs 50× Non-uniform join key
longer than others (e.g., NULL fips = millions)
Small file explosion Athena queries get slower Daily incremental appends
over months never compacted
OOM / GC pressure Tez task killed with Insufficient container
java.lang.OutOfMemoryError memory, large shuffle
Metastore corruption Hive: Table not found Concurrent DDL, Derby DB
despite S3 data existing corruption (not Glue)
Diagnostic decision tree:
Job failed?
├── Yes → Check stderr log first
│ ├── OOM / GC? → Increase tez.am.resource.memory.mb, add nodes
│ ├── FileNotFound? → Check S3 path, Lab 00 data copy, IAM policy
│ ├── NullPointer / ClassCast? → Schema mismatch, check DESCRIBE FORMATTED
│ └── Timeout? → Query too large for cluster, add TABLESAMPLE to debug
│
└── Job succeeded but data is wrong?
├── 0 rows? → Check WHERE clause, join key nulls, partition filter
├── 2× rows? → INSERT INTO on retry → switch to INSERT OVERWRITE
├── NULLs in key column? → Upstream schema drift, check source schema
└── Stale data? → Check pipeline_runs table, check Oozie coordinator status
Runbook Template
Copy this template for each lab/pipeline. Store it in a runbooks/ directory
or as a comment in the Oozie workflow.xml / Airflow DAG file.
## Runbook: lab03-ctas (covid_enriched CTAS)
### What it does
Reads nytimes_counties + hospital_beds from the raw S3 zone, joins on FIPS,
computes cases_per_staffed_bed, writes Parquet to the intermediate zone,
registers covid_enriched in the Glue catalog.
### Schedule
Manual (triggered by Lab 03 run_lab.sh) or daily via Oozie coordinator at 06:00 UTC.
### Upstream dependencies
- lab00: S3 raw data must exist
- lab01: nytimes_counties and hospital_beds must be registered in Glue
### Downstream consumers
- lab05-athena: reads covid_enriched for all SQL queries
- lab06-oozie: re-runs this as part of the daily workflow
- Dashboards: any BI tool connected to Athena / Glue
### Expected output
- Table: emr_labs_db.covid_enriched
- Location: s3://{intermediate-bucket}/covid_enriched/
- Format: Parquet (Snappy)
- Row count: 1,500,000 – 1,800,000
- Columns: 10 (date_str, state, county, fips, cases, deaths,
num_staffed_beds, num_icu_beds, cases_per_staffed_bed, deaths_per_icu_bed)
### How to re-run safely
```bash
bash labs/lab-03-transformations/scripts/run_lab.sh
The CTAS uses DROP TABLE IF EXISTS + CREATE TABLE → fully idempotent. Safe to re-run at any time. Previous data is overwritten.
Known failure modes
| Error | Cause | Fix |
|---|---|---|
FAILED: SemanticException Table not found nytimes_counties | Lab 01 not run | Run lab01 first |
0 rows written | FIPS column is NULL in source | Check raw CSV headers |
Spot capacity unavailable | AWS availability zone | Re-run (script retries in 10 min) |
Access Denied on s3:// | IAM policy not applied | Re-run terraform apply |
Rollback procedure
If corrupt data was written:
# Drop the bad table (Glue entry removed, S3 data preserved because EXTERNAL)
hive -e "DROP TABLE IF EXISTS emr_labs_db.covid_enriched;"
# Remove the bad Parquet files from S3
aws s3 rm s3://{intermediate-bucket}/covid_enriched/ --recursive
# Re-run from scratch
bash labs/lab-03-transformations/scripts/run_lab.sh
On-call contact
Data Engineering team — #data-oncall Slack channel
---
## Project Structure
emr-labs/ ├── terraform/ │ ├── modules/s3/ S3 buckets (5), lifecycle, public-access block │ ├── modules/iam/ EMR service role + EC2 instance profile + Glue policy │ └── envs/dev/ Root module: wires S3 + IAM + Glue DB ├── labs/ │ ├── lab-00-foundation/ terraform apply + aws s3 sync │ ├── lab-01-hive-basics/ CREATE EXTERNAL TABLE → Glue │ ├── lab-02-glue-catalog/ DESCRIBE FORMATTED, catalog inspection │ ├── lab-03-transformations/ CTAS JOIN → Parquet intermediate │ ├── lab-04-drop-tables/ External (data safe) vs managed (data gone) │ ├── lab-05-athena/ Serverless SQL, views, LAG() window functions │ └── lab-06-oozie/ Oozie workflow.xml + coordinator.xml + submit └── shared/ ├── configs/hive-glue-config.json Hive→Glue metastore wiring ├── configs/emr-instance-fleet.json Spot fleet (cost-optimised) └── scripts/ ├── get_tf_outputs.sh Terraform → env vars └── wait_for_step.sh EMR step poller