Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

EMR Flink Labs — COVID-19 Analytics with Apache Flink SQL + PyFlink

Seven hands-on labs that build a COVID-19 analytics pipeline using Apache Flink on Amazon EMR 7.2.0. You will use Flink SQL Client, the Flink Hive connector (backed by AWS Glue), event-time windowing (tumbling, sliding, session), PyFlink DataStream API, Athena federated queries, and Oozie scheduling via shell actions.


Table of Contents

  1. Architecture
  2. Prerequisites
  3. Repository Layout
  4. EMR Configuration
  5. Lab 00 — Data Foundation
  6. Lab 01 — Flink SQL Basics
  7. Lab 02 — Hive Catalog Inspection
  8. Lab 03 — Enrichment Join
  9. Lab 04 — Windowing
  10. Lab 05 — Athena Queries
  11. Lab 06 — Oozie Scheduling
  12. Key Concepts
  13. Troubleshooting

1. Architecture

s3://covid19-lake  (public)
        │
        │  lab-00: sync CSV
        ▼
s3://emr-labs-raw/covid/
        │
        │  lab-01: CREATE CATALOG glue_hive
        │          CREATE TABLE with connector=filesystem/format=csv
        ▼
AWS Glue Data Catalog (emr_labs_db)
  ├─ nytimes_counties   ← Flink filesystem/CSV source table
  └─ hospital_beds      ← Flink filesystem/CSV source table
        │
        │  lab-03: Flink SQL LEFT JOIN → Parquet sink
        ▼
s3://emr-labs-intermediate/covid/enriched/flink-parquet/
  (Parquet, snappy, partitioned by state)
        │
        │  lab-04: windowing (TUMBLE / HOP / SESSION / PyFlink DataStream)
        ▼
s3://emr-labs-intermediate/covid/analytics/
  ├─ weekly-trend/        ← tumbling 7-day
  ├─ pyflink-weekly/      ← DataStream API
  └─ ...

   lab-05: Athena    lab-06: Oozie shell action

2. Prerequisites

RequirementNotes
Terraform appliedterraform/envs/dev/ provisioned
AWS CLI ≥ 2.15Profile with EMR + S3 + Glue + Athena permissions
Bash ≥ 4.0brew install bash on macOS
jqFor get_tf_outputs.sh
KEY_PAIR env var (optional)EC2 key pair for SSH to master
export AWS_DEFAULT_REGION=us-east-1

3. Repository Layout

emr-flink-labs/
├── README.md
├── shared/
│   └── configs/
│       └── flink-config.json           ← Flink EMR classifications
└── labs/
    ├── lab-00-foundation/
    │   └── scripts/copy_public_data.sh
    ├── lab-01-flink-basics/
    │   ├── sql/
    │   │   ├── 01_create_source_tables.sql
    │   │   └── 02_query_tables.sql
    │   └── scripts/run_lab.sh
    ├── lab-02-hive-catalog/
    │   ├── sql/01_inspect_catalog.sql
    │   └── scripts/run_lab.sh
    ├── lab-03-flink-sql/
    │   ├── sql/
    │   │   ├── 01_enrichment_join.sql
    │   │   └── 02_validate_output.sql
    │   └── scripts/run_lab.sh
    ├── lab-04-windowing/
    │   ├── sql/
    │   │   ├── 01_tumbling_window.sql
    │   │   ├── 02_sliding_window.sql
    │   │   └── 03_session_window.sql
    │   ├── pyflink/04_datastream_window.py
    │   └── scripts/run_lab.sh
    ├── lab-05-athena/
    │   ├── sql/
    │   │   ├── 01_create_view.sql
    │   │   ├── 02_top_states.sql
    │   │   └── 03_county_trend.sql
    │   └── scripts/run_lab.sh
    └── lab-06-oozie-flink/
        ├── oozie/
        │   ├── workflow.xml
        │   ├── coordinator.xml
        │   ├── job.properties
        │   └── run_flink_sql.sh        ← helper script for Oozie shell action
        └── scripts/
            ├── run_lab.sh
            └── submit_oozie.sh

4. EMR Configuration

Applied to every cluster via --configurations file://....

Key settings:

ClassificationPropertyValuePurpose
flink-confjobmanager.memory.process.size2048mJobManager heap
flink-conftaskmanager.memory.process.size4096mTaskManager heap
flink-conftaskmanager.numberOfTaskSlots2Slots per TaskManager
flink-confparallelism.default4Default job parallelism
flink-confstate.backendrocksdbRocksDB state backend
flink-confstate.checkpoints.dirs3://<PLACEHOLDER>/flink/checkpoints/Checkpoint location (replace at runtime)
flink-confexecution.checkpointing.modeEXACTLY_ONCEExactly-once semantics
flink-confrestart-strategy.fixed-delay.attempts3Retry on failure
hadoop-env/exportFLINK_HOME/usr/lib/flinkFlink binary path

The state.checkpoints.dir placeholder is replaced at cluster launch in each run_lab.sh script via a --conf override.


5. Lab 00 — Data Foundation

Goal: Copy COVID-19 CSV data from the public AWS Data Lake to your raw S3 bucket.

./labs/lab-00-foundation/scripts/copy_public_data.sh

Same as the Hive and Spark flavours — no EMR cluster needed. Run once across all three flavours.


Goal: Connect to the Glue Data Catalog from Flink SQL, register filesystem-backed source tables, and run analytical queries.

./labs/lab-01-flink-basics/scripts/run_lab.sh

Cluster config: Flink + Hadoop + Hive on emr-7.2.0.

01_create_source_tables.sql

SET 'execution.runtime-mode' = 'batch';
SET 'sql-client.execution.result-mode' = 'tableau';

CREATE CATALOG glue_hive WITH (
    'type'             = 'hive',
    'hive-conf-dir'    = '/etc/hive/conf/',
    'hadoop-conf-dir'  = '/etc/hadoop/conf/'
);
USE CATALOG glue_hive;
USE `{{DB}}`;

CREATE TABLE nytimes_counties (
    report_date      STRING,
    county           STRING,
    state            STRING,
    fips_code        STRING,
    cumulative_cases BIGINT,
    cumulative_deaths BIGINT
) WITH (
    'connector'        = 'filesystem',
    'path'             = 's3://{{RAW_BUCKET}}/covid/nytimes/',
    'format'           = 'csv',
    'csv.ignore-parse-errors' = 'true'
);

02_query_tables.sql

-- Row counts
SELECT 'nytimes_counties' AS tbl, COUNT(*) AS cnt FROM nytimes_counties
UNION ALL
SELECT 'hospital_beds', COUNT(*) FROM hospital_beds;

-- Top 10 states by cumulative cases
SELECT state, SUM(cumulative_cases) AS total_cases
FROM nytimes_counties
GROUP BY state
ORDER BY total_cases DESC
LIMIT 10;

EMR step pattern

# Step 1: flink-yarn-session (detached YARN session)
aws emr add-steps --cluster-id "$CLUSTER_ID" --steps '[{
  "Name": "FlinkYarnSession",
  "HadoopJarStep": {
    "Jar": "command-runner.jar",
    "Args": ["bash","-c",
      "/usr/lib/flink/bin/yarn-session.sh -nm FlinkSession --detached"]
  }
}]'

# Step 2: sql-client via process substitution
aws emr add-steps --cluster-id "$CLUSTER_ID" --steps '[{
  "Name": "CreateSourceTables",
  "HadoopJarStep": {
    "Jar": "command-runner.jar",
    "Args": ["bash","-c",
      "/usr/lib/flink/bin/sql-client.sh -f <(aws s3 cp s3://.../01_create_source_tables.sql -)"]
  }
}]'

<(...) is bash process substitution. sql-client.sh -f accepts a file descriptor. Flink SQL Client runs the SQL file in batch mode and exits when complete.

The Flink SQL Client (bin/sql-client.sh) is an interactive SQL shell (or batch file executor) that:

  • Connects to an existing YARN session or Flink cluster
  • Executes DDL (CREATE TABLE, CREATE CATALOG) and DML (INSERT INTO, SELECT) statements
  • In batch mode (SET 'execution.runtime-mode' = 'batch'): reads all available data and exits
  • In streaming mode: runs continuously, reading new records as they arrive

The Hive connector allows Flink to:

  1. Read Hive/Glue metastore table metadata (schema, partition spec, location)
  2. Discover partitions at query time
  3. Write Parquet/ORC output that is registered back in Glue

This makes tables written by Flink immediately visible to Spark, Hive, and Athena — all sharing the same Glue catalog.


7. Lab 02 — Hive Catalog Inspection

Goal: Explore the Glue catalog metadata from within Flink SQL.

./labs/lab-02-hive-catalog/scripts/run_lab.sh

01_inspect_catalog.sql

SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
DESCRIBE nytimes_counties;
SHOW CREATE TABLE hospital_beds;
SELECT COUNT(*) FROM nytimes_counties;

Key Concepts — Catalog Hierarchy

Flink’s catalog model:

CATALOG (glue_hive)
  └─ DATABASE (emr_labs_db)
       ├─ TABLE (nytimes_counties)
       └─ TABLE (hospital_beds)

USE CATALOG glue_hive sets the default catalog. USE \emr_labs_db`` sets the default database. After that, unqualified table names resolve to the Glue database.


8. Lab 03 — Enrichment Join

Goal: Join COVID county data with hospital beds, write enriched Parquet to S3, validate output.

./labs/lab-03-flink-sql/scripts/run_lab.sh

Two EMR steps: EnrichmentJoin and ValidateOutput.

01_enrichment_join.sql

-- Sink table
CREATE TABLE enriched_covid_flink (
    report_date        STRING,
    state              STRING,
    county             STRING,
    fips_code          STRING,
    cumulative_cases   BIGINT,
    cumulative_deaths  BIGINT,
    case_fatality_rate DOUBLE,
    num_staffed_beds   DOUBLE,
    num_icu_beds       DOUBLE,
    cases_per_staffed_bed DOUBLE
) PARTITIONED BY (state)
WITH (
    'connector'           = 'filesystem',
    'path'                = 's3://{{INTERMEDIATE_BUCKET}}/covid/enriched/flink-parquet/',
    'format'              = 'parquet',
    'parquet.compression' = 'SNAPPY'
);

INSERT INTO enriched_covid_flink
SELECT
    n.report_date, n.state, n.county, n.fips_code,
    n.cumulative_cases, n.cumulative_deaths,
    n.cumulative_deaths * 100.0 / NULLIF(n.cumulative_cases, 0),
    h.num_staffed_beds, h.num_icu_beds,
    n.cumulative_cases * 1.0 / NULLIF(h.num_staffed_beds, 0)
FROM nytimes_counties n
LEFT JOIN hospital_beds h ON n.fips_code = h.county_fips_code;

In batch mode, INSERT INTO <sink> SELECT ... reads all source data and writes it to the sink once. The connector=filesystem sink writes Parquet files partitioned by state into the specified S3 path.

NULLIF(x, 0) avoids division by zero — returns NULL if the denominator is 0. Flink SQL follows standard SQL NULL semantics: arithmetic with NULL produces NULL.


9. Lab 04 — Windowing

Goal: Apply tumbling, sliding, and session windows to the enriched COVID dataset. Also demonstrates the PyFlink DataStream API.

./labs/lab-04-windowing/scripts/run_lab.sh

Four EMR steps: TumblingWindow, SlidingWindow, SessionWindow, PyFlinkDataStream.

Window Types Compared

WindowTVF SyntaxUse case
TumblingTUMBLE(TABLE src, DESCRIPTOR(ts), INTERVAL '7' DAY)Non-overlapping fixed periods
Sliding (Hop)HOP(TABLE src, DESCRIPTOR(ts), INTERVAL '1' DAY, INTERVAL '7' DAY)Rolling averages, overlapping windows
SessionSESSION(TABLE src PARTITION BY key, DESCRIPTOR(ts), INTERVAL '14' DAY)Grouping bursts of activity

01_tumbling_window.sql — 7-day tumbling

CREATE TEMPORARY VIEW enriched_with_ts AS
SELECT TO_TIMESTAMP(report_date, 'yyyy-MM-dd') AS event_ts, state, county,
       cumulative_cases, cumulative_deaths
FROM enriched_covid_flink;

INSERT INTO weekly_covid_trend
SELECT window_start, window_end, state,
       SUM(cumulative_cases), SUM(cumulative_deaths), COUNT(DISTINCT county)
FROM TABLE(
    TUMBLE(TABLE enriched_with_ts, DESCRIPTOR(event_ts), INTERVAL '7' DAY)
)
GROUP BY window_start, window_end, state;

02_sliding_window.sql — 30-day/7-day HOP

SELECT window_start, window_end, state,
       SUM(cumulative_cases) AS rolling_30d_cases
FROM TABLE(
    HOP(TABLE enriched_with_ts, DESCRIPTOR(event_ts),
        INTERVAL '7' DAY,   -- slide step
        INTERVAL '30' DAY)  -- window size
)
GROUP BY window_start, window_end, state
ORDER BY state, window_start LIMIT 50;

In a sliding window, each row appears in multiple windows (floor(window_size / step) = 30/7 ≈ 4 windows per row). Results overlap: useful for rolling averages.

03_session_window.sql — 14-day gap SESSION

SELECT window_start, window_end, state, county,
       MAX(cumulative_cases) - MIN(cumulative_cases) AS cases_in_session
FROM TABLE(
    SESSION(TABLE enriched_with_ts PARTITION BY county,
            DESCRIPTOR(event_ts), INTERVAL '14' DAY)
)
GROUP BY window_start, window_end, state, county
HAVING MAX(cumulative_cases) - MIN(cumulative_cases) > 1000
ORDER BY cases_in_session DESC LIMIT 20;

A session window closes when no events arrive within the gap duration. With COVID data (daily reports), a 14-day gap would correspond to a county stopping and resuming reporting.

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)

stream = (
    env.from_source(source, watermark_strategy, "EnrichedCovid-S3")
       .key_by(lambda r: r.get("state", "unknown"))
       .window(TumblingEventTimeWindows.of(Time.days(7)))
       .process(WeeklyAggregator())
)
stream.sink_to(sink)
env.execute("PyFlink-WeeklyTumblingWindow")

The DataStream API offers lower-level control than Flink SQL: custom ProcessWindowFunction allows arbitrary Python logic per window (e.g., statistical anomaly detection, complex aggregations beyond SQL aggregate functions).

Key Concepts — Event Time vs Processing Time

Event TimeProcessing Time
ClockEmbedded in data (e.g., report_date)System wall clock when record is processed
CorrectnessHandles late data correctlyFast, but wrong if data arrives out of order
RequiresWatermarksNothing extra
Use forCOVID data (historical dates)Real-time monitoring where approximate is OK

In batch mode (SET 'execution.runtime-mode' = 'batch'), Flink knows all data has arrived, so it treats the full dataset as a single watermark-advancing pass. Watermark configuration is still required for the TVF windowing syntax.

Key Concepts — Watermarks

A watermark is a special event in the stream that declares “no more events with timestamp < W will arrive.” Flink uses watermarks to decide when a window is complete and can be evaluated.

-- In a streaming scenario (not batch):
CREATE TABLE nytimes_stream (
    report_date  STRING,
    event_ts     AS TO_TIMESTAMP(report_date, 'yyyy-MM-dd'),
    WATERMARK FOR event_ts AS event_ts - INTERVAL '1' DAY
) WITH (...);

With a 1-day watermark, windows close 1 day after the last event timestamp seen. Late records arriving after window close are dropped (or routed to a side output in the DataStream API).


10. Lab 05 — Athena Queries

Goal: Run Athena SQL against Flink-written Parquet data in the Glue catalog.

./labs/lab-05-athena/scripts/run_lab.sh

No EMR cluster needed. The SQL files are identical to the Spark and Hive flavours — Athena only cares about the data format (Parquet + Glue schema), not which engine wrote it.

See emr-hive-tez-labs/README.md Lab 05 for full Athena query explanations.


11. Lab 06 — Oozie Scheduling

Goal: Schedule the Flink pipeline with Oozie using <shell> actions.

# Launch persistent cluster
./labs/lab-06-oozie-flink/scripts/run_lab.sh

# Submit coordinator
./labs/lab-06-oozie-flink/scripts/submit_oozie.sh <CLUSTER_ID>

# Terminate after lab
aws emr terminate-clusters --cluster-ids <CLUSTER_ID>

Why <shell> action?

Oozie has native actions for Hive (<hive>), Spark (<spark>), and others, but no native Flink action. The <shell> action runs an arbitrary shell script on the cluster, which in turn invokes flink run or sql-client.sh.

workflow.xml structure

<action name="flink-create-tables">
  <shell xmlns="uri:oozie:shell-action:0.3">
    <exec>run_flink_sql.sh</exec>
    <argument>s3://.../01_create_source_tables.sql</argument>
    <argument>${glueDb}</argument>
    <argument>${rawBucket}</argument>
    <argument>${intermediateBucket}</argument>
    <file>s3://.../run_flink_sql.sh#run_flink_sql.sh</file>
  </shell>
  <ok to="flink-enrich"/>
  <error to="kill"/>
</action>

The <file> element distributes run_flink_sql.sh to the YARN container’s working directory. <exec> names the script to run. <argument> passes positional arguments.

SQL_S3="$1"; DB="$2"; RAW_BUCKET="$3"; INTERMEDIATE_BUCKET="$4"
aws s3 cp "$SQL_S3" /tmp/flink_sql.sql
sed -i "s|{{DB}}|$DB|g; ..." /tmp/flink_sql.sql
/usr/lib/flink/bin/sql-client.sh -f /tmp/flink_sql.sql

This script is uploaded to S3 alongside the SQL files. Oozie distributes it to the container before execution.


12. Key Concepts

Batch vs Streaming Mode

-- Batch: reads all existing data, terminates
SET 'execution.runtime-mode' = 'batch';

-- Streaming: reads indefinitely, requires WATERMARK for windowing
SET 'execution.runtime-mode' = 'streaming';

All labs run in batch mode because COVID data is historical and finite. To adapt to streaming (e.g., receiving live updates from Kinesis), change to streaming mode and add a WATERMARK definition to source tables.

Flink SQLPyFlink DataStream
Abstraction levelHigh (declarative)Low (imperative)
Complex logicLimited (SQL expressiveness)Arbitrary Python
PerformanceCompiled via Blink plannerPython overhead (CPython UDF)
Best forJOINs, GROUP BY, windowed aggregationsCustom stateful logic, ML inference per-record
Learning curveSQL knowledge sufficientRequires Flink/streaming knowledge

RocksDB State Backend

state.backend = rocksdb stores operator state on disk rather than in JVM heap. Benefits:

  • Large state: handles state larger than available RAM (spills to disk)
  • Incremental checkpoints: only changed state is written to S3 per checkpoint
  • Exactly-once: state is restored from the last good checkpoint on failure

For the batch COVID labs, no state is used, but RocksDB is configured for completeness — it becomes essential when extending these labs to streaming.

Exactly-Once Semantics

Flink achieves exactly-once end-to-end via:

  1. Checkpointing: Flink periodically barriers all operators and saves consistent state snapshots to S3
  2. Two-phase commit: on recovery, uncommitted sink writes are rolled back and the last checkpoint is replayed
  3. Idempotent sinks: the filesystem/Parquet sink uses part files and finalises them only on checkpoint completion

Set in flink-config.json: execution.checkpointing.mode = EXACTLY_ONCE.

When a YARN session is running, the Flink Dashboard is accessible at:

http://<MASTER_PUBLIC_DNS>:8081

It shows: running jobs, parallelism, operator backpressure, checkpoint history, JM/TM memory, task slots.

To open from your terminal (requires the cluster’s security group to allow port 8081, or use an SSH tunnel):

ssh -L 8081:localhost:8081 hadoop@<MASTER_DNS>
# then open http://localhost:8081

13. Troubleshooting

sql-client.sh exits with ClassNotFoundException

The Hive connector JARs must be on the Flink classpath. On EMR 7.2.0 they are at /usr/lib/flink/lib/. If running a custom Flink version, add:

FLINK_CLASSPATH="$FLINK_HOME/lib/flink-connector-hive*.jar:$FLINK_HOME/lib/hive-exec*.jar"

CREATE CATALOG fails: Unable to instantiate HiveMetaStoreClient

The Hive configuration must point to Glue. Check that hive.metastore.client.factory.class is set in /etc/hive/conf/hive-site.xml on the EMR cluster. On EMR 7.2.0 with the Hive application installed this is done automatically.

Process substitution <(...) fails in EMR step

Process substitution is a bash feature, not POSIX sh. The command-runner.jar step uses bash -c "..." which supports it. If you see “syntax error near unexpected token”, verify the step Args starts with bash, -c.

After USE CATALOG glue_hive; USE \emr_labs_db``, the table should be visible. If not:

  1. Check that Lab 01 completed (tables were created)
  2. Run SHOW TABLES in sql-client to list available tables
  3. Verify the Glue DB name matches $GLUE_DB (exported by get_tf_outputs.sh)

OOM in TaskManager

Increase taskmanager.memory.process.size in flink-config.json (e.g., 6144m) and re-launch the cluster. Alternatively, reduce parallelism: SET 'parallelism.default' = '2' in the SQL file header.

Session window produces no results

SESSION windows require events to be partitioned by the key column in the TVF syntax: SESSION(TABLE t PARTITION BY county, ...). Without PARTITION BY, all events form a single session. Also verify the gap interval is appropriate for your data density.

PyFlink requires Python 3.7+ and the apache-flink package. On EMR 7.2.0, PyFlink is pre-installed. Verify:

python3 -c "import pyflink; print(pyflink.__version__)"

If missing, the EMR bootstrap action or release label may not include PyFlink. Ensure Flink is in the --applications list when creating the cluster.