EMR Flink Labs — COVID-19 Analytics with Apache Flink SQL + PyFlink
Seven hands-on labs that build a COVID-19 analytics pipeline using Apache Flink on Amazon EMR 7.2.0. You will use Flink SQL Client, the Flink Hive connector (backed by AWS Glue), event-time windowing (tumbling, sliding, session), PyFlink DataStream API, Athena federated queries, and Oozie scheduling via shell actions.
Table of Contents
- Architecture
- Prerequisites
- Repository Layout
- EMR Configuration
- Lab 00 — Data Foundation
- Lab 01 — Flink SQL Basics
- Lab 02 — Hive Catalog Inspection
- Lab 03 — Enrichment Join
- Lab 04 — Windowing
- Lab 05 — Athena Queries
- Lab 06 — Oozie Scheduling
- Key Concepts
- Troubleshooting
1. Architecture
s3://covid19-lake (public)
│
│ lab-00: sync CSV
▼
s3://emr-labs-raw/covid/
│
│ lab-01: CREATE CATALOG glue_hive
│ CREATE TABLE with connector=filesystem/format=csv
▼
AWS Glue Data Catalog (emr_labs_db)
├─ nytimes_counties ← Flink filesystem/CSV source table
└─ hospital_beds ← Flink filesystem/CSV source table
│
│ lab-03: Flink SQL LEFT JOIN → Parquet sink
▼
s3://emr-labs-intermediate/covid/enriched/flink-parquet/
(Parquet, snappy, partitioned by state)
│
│ lab-04: windowing (TUMBLE / HOP / SESSION / PyFlink DataStream)
▼
s3://emr-labs-intermediate/covid/analytics/
├─ weekly-trend/ ← tumbling 7-day
├─ pyflink-weekly/ ← DataStream API
└─ ...
lab-05: Athena lab-06: Oozie shell action
2. Prerequisites
| Requirement | Notes |
|---|---|
| Terraform applied | terraform/envs/dev/ provisioned |
| AWS CLI ≥ 2.15 | Profile with EMR + S3 + Glue + Athena permissions |
| Bash ≥ 4.0 | brew install bash on macOS |
| jq | For get_tf_outputs.sh |
KEY_PAIR env var (optional) | EC2 key pair for SSH to master |
export AWS_DEFAULT_REGION=us-east-1
3. Repository Layout
emr-flink-labs/
├── README.md
├── shared/
│ └── configs/
│ └── flink-config.json ← Flink EMR classifications
└── labs/
├── lab-00-foundation/
│ └── scripts/copy_public_data.sh
├── lab-01-flink-basics/
│ ├── sql/
│ │ ├── 01_create_source_tables.sql
│ │ └── 02_query_tables.sql
│ └── scripts/run_lab.sh
├── lab-02-hive-catalog/
│ ├── sql/01_inspect_catalog.sql
│ └── scripts/run_lab.sh
├── lab-03-flink-sql/
│ ├── sql/
│ │ ├── 01_enrichment_join.sql
│ │ └── 02_validate_output.sql
│ └── scripts/run_lab.sh
├── lab-04-windowing/
│ ├── sql/
│ │ ├── 01_tumbling_window.sql
│ │ ├── 02_sliding_window.sql
│ │ └── 03_session_window.sql
│ ├── pyflink/04_datastream_window.py
│ └── scripts/run_lab.sh
├── lab-05-athena/
│ ├── sql/
│ │ ├── 01_create_view.sql
│ │ ├── 02_top_states.sql
│ │ └── 03_county_trend.sql
│ └── scripts/run_lab.sh
└── lab-06-oozie-flink/
├── oozie/
│ ├── workflow.xml
│ ├── coordinator.xml
│ ├── job.properties
│ └── run_flink_sql.sh ← helper script for Oozie shell action
└── scripts/
├── run_lab.sh
└── submit_oozie.sh
4. EMR Configuration
shared/configs/flink-config.json
Applied to every cluster via --configurations file://....
Key settings:
| Classification | Property | Value | Purpose |
|---|---|---|---|
flink-conf | jobmanager.memory.process.size | 2048m | JobManager heap |
flink-conf | taskmanager.memory.process.size | 4096m | TaskManager heap |
flink-conf | taskmanager.numberOfTaskSlots | 2 | Slots per TaskManager |
flink-conf | parallelism.default | 4 | Default job parallelism |
flink-conf | state.backend | rocksdb | RocksDB state backend |
flink-conf | state.checkpoints.dir | s3://<PLACEHOLDER>/flink/checkpoints/ | Checkpoint location (replace at runtime) |
flink-conf | execution.checkpointing.mode | EXACTLY_ONCE | Exactly-once semantics |
flink-conf | restart-strategy.fixed-delay.attempts | 3 | Retry on failure |
hadoop-env/export | FLINK_HOME | /usr/lib/flink | Flink binary path |
The
state.checkpoints.dirplaceholder is replaced at cluster launch in eachrun_lab.shscript via a--confoverride.
5. Lab 00 — Data Foundation
Goal: Copy COVID-19 CSV data from the public AWS Data Lake to your raw S3 bucket.
./labs/lab-00-foundation/scripts/copy_public_data.sh
Same as the Hive and Spark flavours — no EMR cluster needed. Run once across all three flavours.
6. Lab 01 — Flink SQL Basics
Goal: Connect to the Glue Data Catalog from Flink SQL, register filesystem-backed source tables, and run analytical queries.
./labs/lab-01-flink-basics/scripts/run_lab.sh
Cluster config: Flink + Hadoop + Hive on emr-7.2.0.
01_create_source_tables.sql
SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE CATALOG glue_hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf/',
'hadoop-conf-dir' = '/etc/hadoop/conf/'
);
USE CATALOG glue_hive;
USE `{{DB}}`;
CREATE TABLE nytimes_counties (
report_date STRING,
county STRING,
state STRING,
fips_code STRING,
cumulative_cases BIGINT,
cumulative_deaths BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 's3://{{RAW_BUCKET}}/covid/nytimes/',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);
02_query_tables.sql
-- Row counts
SELECT 'nytimes_counties' AS tbl, COUNT(*) AS cnt FROM nytimes_counties
UNION ALL
SELECT 'hospital_beds', COUNT(*) FROM hospital_beds;
-- Top 10 states by cumulative cases
SELECT state, SUM(cumulative_cases) AS total_cases
FROM nytimes_counties
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;
EMR step pattern
# Step 1: flink-yarn-session (detached YARN session)
aws emr add-steps --cluster-id "$CLUSTER_ID" --steps '[{
"Name": "FlinkYarnSession",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["bash","-c",
"/usr/lib/flink/bin/yarn-session.sh -nm FlinkSession --detached"]
}
}]'
# Step 2: sql-client via process substitution
aws emr add-steps --cluster-id "$CLUSTER_ID" --steps '[{
"Name": "CreateSourceTables",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["bash","-c",
"/usr/lib/flink/bin/sql-client.sh -f <(aws s3 cp s3://.../01_create_source_tables.sql -)"]
}
}]'
<(...)is bash process substitution.sql-client.sh -faccepts a file descriptor. Flink SQL Client runs the SQL file in batch mode and exits when complete.
Key Concepts — Flink SQL Client
The Flink SQL Client (bin/sql-client.sh) is an interactive SQL shell (or batch file executor) that:
- Connects to an existing YARN session or Flink cluster
- Executes DDL (CREATE TABLE, CREATE CATALOG) and DML (INSERT INTO, SELECT) statements
- In batch mode (
SET 'execution.runtime-mode' = 'batch'): reads all available data and exits - In streaming mode: runs continuously, reading new records as they arrive
Key Concepts — Flink’s Hive Connector
The Hive connector allows Flink to:
- Read Hive/Glue metastore table metadata (schema, partition spec, location)
- Discover partitions at query time
- Write Parquet/ORC output that is registered back in Glue
This makes tables written by Flink immediately visible to Spark, Hive, and Athena — all sharing the same Glue catalog.
7. Lab 02 — Hive Catalog Inspection
Goal: Explore the Glue catalog metadata from within Flink SQL.
./labs/lab-02-hive-catalog/scripts/run_lab.sh
01_inspect_catalog.sql
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
DESCRIBE nytimes_counties;
SHOW CREATE TABLE hospital_beds;
SELECT COUNT(*) FROM nytimes_counties;
Key Concepts — Catalog Hierarchy
Flink’s catalog model:
CATALOG (glue_hive)
└─ DATABASE (emr_labs_db)
├─ TABLE (nytimes_counties)
└─ TABLE (hospital_beds)
USE CATALOG glue_hive sets the default catalog. USE \emr_labs_db`` sets the default database. After that, unqualified table names resolve to the Glue database.
8. Lab 03 — Enrichment Join
Goal: Join COVID county data with hospital beds, write enriched Parquet to S3, validate output.
./labs/lab-03-flink-sql/scripts/run_lab.sh
Two EMR steps: EnrichmentJoin and ValidateOutput.
01_enrichment_join.sql
-- Sink table
CREATE TABLE enriched_covid_flink (
report_date STRING,
state STRING,
county STRING,
fips_code STRING,
cumulative_cases BIGINT,
cumulative_deaths BIGINT,
case_fatality_rate DOUBLE,
num_staffed_beds DOUBLE,
num_icu_beds DOUBLE,
cases_per_staffed_bed DOUBLE
) PARTITIONED BY (state)
WITH (
'connector' = 'filesystem',
'path' = 's3://{{INTERMEDIATE_BUCKET}}/covid/enriched/flink-parquet/',
'format' = 'parquet',
'parquet.compression' = 'SNAPPY'
);
INSERT INTO enriched_covid_flink
SELECT
n.report_date, n.state, n.county, n.fips_code,
n.cumulative_cases, n.cumulative_deaths,
n.cumulative_deaths * 100.0 / NULLIF(n.cumulative_cases, 0),
h.num_staffed_beds, h.num_icu_beds,
n.cumulative_cases * 1.0 / NULLIF(h.num_staffed_beds, 0)
FROM nytimes_counties n
LEFT JOIN hospital_beds h ON n.fips_code = h.county_fips_code;
Key Concepts — Flink SQL DML
In batch mode, INSERT INTO <sink> SELECT ... reads all source data and writes it to the sink once. The connector=filesystem sink writes Parquet files partitioned by state into the specified S3 path.
NULLIF(x, 0) avoids division by zero — returns NULL if the denominator is 0. Flink SQL follows standard SQL NULL semantics: arithmetic with NULL produces NULL.
9. Lab 04 — Windowing
Goal: Apply tumbling, sliding, and session windows to the enriched COVID dataset. Also demonstrates the PyFlink DataStream API.
./labs/lab-04-windowing/scripts/run_lab.sh
Four EMR steps: TumblingWindow, SlidingWindow, SessionWindow, PyFlinkDataStream.
Window Types Compared
| Window | TVF Syntax | Use case |
|---|---|---|
| Tumbling | TUMBLE(TABLE src, DESCRIPTOR(ts), INTERVAL '7' DAY) | Non-overlapping fixed periods |
| Sliding (Hop) | HOP(TABLE src, DESCRIPTOR(ts), INTERVAL '1' DAY, INTERVAL '7' DAY) | Rolling averages, overlapping windows |
| Session | SESSION(TABLE src PARTITION BY key, DESCRIPTOR(ts), INTERVAL '14' DAY) | Grouping bursts of activity |
01_tumbling_window.sql — 7-day tumbling
CREATE TEMPORARY VIEW enriched_with_ts AS
SELECT TO_TIMESTAMP(report_date, 'yyyy-MM-dd') AS event_ts, state, county,
cumulative_cases, cumulative_deaths
FROM enriched_covid_flink;
INSERT INTO weekly_covid_trend
SELECT window_start, window_end, state,
SUM(cumulative_cases), SUM(cumulative_deaths), COUNT(DISTINCT county)
FROM TABLE(
TUMBLE(TABLE enriched_with_ts, DESCRIPTOR(event_ts), INTERVAL '7' DAY)
)
GROUP BY window_start, window_end, state;
02_sliding_window.sql — 30-day/7-day HOP
SELECT window_start, window_end, state,
SUM(cumulative_cases) AS rolling_30d_cases
FROM TABLE(
HOP(TABLE enriched_with_ts, DESCRIPTOR(event_ts),
INTERVAL '7' DAY, -- slide step
INTERVAL '30' DAY) -- window size
)
GROUP BY window_start, window_end, state
ORDER BY state, window_start LIMIT 50;
In a sliding window, each row appears in multiple windows (floor(window_size / step) = 30/7 ≈ 4 windows per row). Results overlap: useful for rolling averages.
03_session_window.sql — 14-day gap SESSION
SELECT window_start, window_end, state, county,
MAX(cumulative_cases) - MIN(cumulative_cases) AS cases_in_session
FROM TABLE(
SESSION(TABLE enriched_with_ts PARTITION BY county,
DESCRIPTOR(event_ts), INTERVAL '14' DAY)
)
GROUP BY window_start, window_end, state, county
HAVING MAX(cumulative_cases) - MIN(cumulative_cases) > 1000
ORDER BY cases_in_session DESC LIMIT 20;
A session window closes when no events arrive within the gap duration. With COVID data (daily reports), a 14-day gap would correspond to a county stopping and resuming reporting.
04_datastream_window.py — PyFlink DataStream API
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
stream = (
env.from_source(source, watermark_strategy, "EnrichedCovid-S3")
.key_by(lambda r: r.get("state", "unknown"))
.window(TumblingEventTimeWindows.of(Time.days(7)))
.process(WeeklyAggregator())
)
stream.sink_to(sink)
env.execute("PyFlink-WeeklyTumblingWindow")
The DataStream API offers lower-level control than Flink SQL: custom ProcessWindowFunction allows arbitrary Python logic per window (e.g., statistical anomaly detection, complex aggregations beyond SQL aggregate functions).
Key Concepts — Event Time vs Processing Time
| Event Time | Processing Time | |
|---|---|---|
| Clock | Embedded in data (e.g., report_date) | System wall clock when record is processed |
| Correctness | Handles late data correctly | Fast, but wrong if data arrives out of order |
| Requires | Watermarks | Nothing extra |
| Use for | COVID data (historical dates) | Real-time monitoring where approximate is OK |
In batch mode (SET 'execution.runtime-mode' = 'batch'), Flink knows all data has arrived, so it treats the full dataset as a single watermark-advancing pass. Watermark configuration is still required for the TVF windowing syntax.
Key Concepts — Watermarks
A watermark is a special event in the stream that declares “no more events with timestamp < W will arrive.” Flink uses watermarks to decide when a window is complete and can be evaluated.
-- In a streaming scenario (not batch):
CREATE TABLE nytimes_stream (
report_date STRING,
event_ts AS TO_TIMESTAMP(report_date, 'yyyy-MM-dd'),
WATERMARK FOR event_ts AS event_ts - INTERVAL '1' DAY
) WITH (...);
With a 1-day watermark, windows close 1 day after the last event timestamp seen. Late records arriving after window close are dropped (or routed to a side output in the DataStream API).
10. Lab 05 — Athena Queries
Goal: Run Athena SQL against Flink-written Parquet data in the Glue catalog.
./labs/lab-05-athena/scripts/run_lab.sh
No EMR cluster needed. The SQL files are identical to the Spark and Hive flavours — Athena only cares about the data format (Parquet + Glue schema), not which engine wrote it.
See emr-hive-tez-labs/README.md Lab 05 for full Athena query explanations.
11. Lab 06 — Oozie Scheduling
Goal: Schedule the Flink pipeline with Oozie using <shell> actions.
# Launch persistent cluster
./labs/lab-06-oozie-flink/scripts/run_lab.sh
# Submit coordinator
./labs/lab-06-oozie-flink/scripts/submit_oozie.sh <CLUSTER_ID>
# Terminate after lab
aws emr terminate-clusters --cluster-ids <CLUSTER_ID>
Why <shell> action?
Oozie has native actions for Hive (<hive>), Spark (<spark>), and others, but no native Flink action. The <shell> action runs an arbitrary shell script on the cluster, which in turn invokes flink run or sql-client.sh.
workflow.xml structure
<action name="flink-create-tables">
<shell xmlns="uri:oozie:shell-action:0.3">
<exec>run_flink_sql.sh</exec>
<argument>s3://.../01_create_source_tables.sql</argument>
<argument>${glueDb}</argument>
<argument>${rawBucket}</argument>
<argument>${intermediateBucket}</argument>
<file>s3://.../run_flink_sql.sh#run_flink_sql.sh</file>
</shell>
<ok to="flink-enrich"/>
<error to="kill"/>
</action>
The <file> element distributes run_flink_sql.sh to the YARN container’s working directory. <exec> names the script to run. <argument> passes positional arguments.
run_flink_sql.sh — the Oozie helper
SQL_S3="$1"; DB="$2"; RAW_BUCKET="$3"; INTERMEDIATE_BUCKET="$4"
aws s3 cp "$SQL_S3" /tmp/flink_sql.sql
sed -i "s|{{DB}}|$DB|g; ..." /tmp/flink_sql.sql
/usr/lib/flink/bin/sql-client.sh -f /tmp/flink_sql.sql
This script is uploaded to S3 alongside the SQL files. Oozie distributes it to the container before execution.
12. Key Concepts
Batch vs Streaming Mode
-- Batch: reads all existing data, terminates
SET 'execution.runtime-mode' = 'batch';
-- Streaming: reads indefinitely, requires WATERMARK for windowing
SET 'execution.runtime-mode' = 'streaming';
All labs run in batch mode because COVID data is historical and finite. To adapt to streaming (e.g., receiving live updates from Kinesis), change to streaming mode and add a WATERMARK definition to source tables.
Flink SQL vs PyFlink DataStream API
| Flink SQL | PyFlink DataStream | |
|---|---|---|
| Abstraction level | High (declarative) | Low (imperative) |
| Complex logic | Limited (SQL expressiveness) | Arbitrary Python |
| Performance | Compiled via Blink planner | Python overhead (CPython UDF) |
| Best for | JOINs, GROUP BY, windowed aggregations | Custom stateful logic, ML inference per-record |
| Learning curve | SQL knowledge sufficient | Requires Flink/streaming knowledge |
RocksDB State Backend
state.backend = rocksdb stores operator state on disk rather than in JVM heap. Benefits:
- Large state: handles state larger than available RAM (spills to disk)
- Incremental checkpoints: only changed state is written to S3 per checkpoint
- Exactly-once: state is restored from the last good checkpoint on failure
For the batch COVID labs, no state is used, but RocksDB is configured for completeness — it becomes essential when extending these labs to streaming.
Exactly-Once Semantics
Flink achieves exactly-once end-to-end via:
- Checkpointing: Flink periodically barriers all operators and saves consistent state snapshots to S3
- Two-phase commit: on recovery, uncommitted sink writes are rolled back and the last checkpoint is replayed
- Idempotent sinks: the filesystem/Parquet sink uses part files and finalises them only on checkpoint completion
Set in flink-config.json: execution.checkpointing.mode = EXACTLY_ONCE.
Flink Dashboard
When a YARN session is running, the Flink Dashboard is accessible at:
http://<MASTER_PUBLIC_DNS>:8081
It shows: running jobs, parallelism, operator backpressure, checkpoint history, JM/TM memory, task slots.
To open from your terminal (requires the cluster’s security group to allow port 8081, or use an SSH tunnel):
ssh -L 8081:localhost:8081 hadoop@<MASTER_DNS>
# then open http://localhost:8081
13. Troubleshooting
sql-client.sh exits with ClassNotFoundException
The Hive connector JARs must be on the Flink classpath. On EMR 7.2.0 they are at /usr/lib/flink/lib/. If running a custom Flink version, add:
FLINK_CLASSPATH="$FLINK_HOME/lib/flink-connector-hive*.jar:$FLINK_HOME/lib/hive-exec*.jar"
CREATE CATALOG fails: Unable to instantiate HiveMetaStoreClient
The Hive configuration must point to Glue. Check that hive.metastore.client.factory.class is set in /etc/hive/conf/hive-site.xml on the EMR cluster. On EMR 7.2.0 with the Hive application installed this is done automatically.
Process substitution <(...) fails in EMR step
Process substitution is a bash feature, not POSIX sh. The command-runner.jar step uses bash -c "..." which supports it. If you see “syntax error near unexpected token”, verify the step Args starts with bash, -c.
Flink SQL: Table 'X' was not found
After USE CATALOG glue_hive; USE \emr_labs_db``, the table should be visible. If not:
- Check that Lab 01 completed (tables were created)
- Run
SHOW TABLESin sql-client to list available tables - Verify the Glue DB name matches
$GLUE_DB(exported byget_tf_outputs.sh)
OOM in TaskManager
Increase taskmanager.memory.process.size in flink-config.json (e.g., 6144m) and re-launch the cluster. Alternatively, reduce parallelism: SET 'parallelism.default' = '2' in the SQL file header.
Session window produces no results
SESSION windows require events to be partitioned by the key column in the TVF syntax: SESSION(TABLE t PARTITION BY county, ...). Without PARTITION BY, all events form a single session. Also verify the gap interval is appropriate for your data density.
PyFlink: ModuleNotFoundError
PyFlink requires Python 3.7+ and the apache-flink package. On EMR 7.2.0, PyFlink is pre-installed. Verify:
python3 -c "import pyflink; print(pyflink.__version__)"
If missing, the EMR bootstrap action or release label may not include PyFlink. Ensure Flink is in the --applications list when creating the cluster.