Data Transform Refresher
dbt, Spark, Flink, and cuDF — the complete transform layer from warehouse SQL to GPU-accelerated dataflows.
Table of Contents
Why Data Transform?
Every data pipeline has three phases: get the data, move it somewhere, and make it useful. The third step — transformation — is where the business value lives. For decades this was done as ETL (Extract, Transform, Load): clean and shape data before writing it to the warehouse. Modern practice inverts this: ELT (Extract, Load, Transform) loads raw data first, then transforms it inside the warehouse or compute layer where resources are elastic and SQL is native.
# Modern data stack, simplified:
#
# Sources Extract/Load Transform Serve
# ------- ------------ --------- -----
# Postgres --> Fivetran / Airbyte dbt (SQL) --> BI tools
# S3 events --> Spark batch Spark SQL --> ML features
# Kafka --> Flink connector Flink ops --> real-time APIs
# pandas jobs --> direct write cuDF (GPU) --> notebooks
#
# ETL: transform before load (data center era)
# ELT: transform after load (cloud warehouse era)
| Dimension | ETL | ELT |
|---|---|---|
| Where transform runs | Dedicated ETL server | Inside warehouse / compute cluster |
| Raw data preserved? | Often not | Yes — raw layer in warehouse |
| Iteration speed | Slow (pipeline redeploy) | Fast (SQL rerun) |
| Tooling | Informatica, SSIS | dbt, Spark, Flink, cuDF |
| Best for | Legacy systems, compliance | Cloud-native, agile analytics |
Where each tool fits
| Tool | Primary Use Case | Latency | Scale |
|---|---|---|---|
| dbt | SQL transforms inside the warehouse (Snowflake, BigQuery, Redshift, DuckDB) | Minutes | Warehouse-bound |
| Spark | Large-scale batch / micro-batch outside the warehouse | Seconds–minutes | Petabytes |
| Flink | True stream processing, stateful event-by-event computation | Milliseconds | Petabytes/day |
| cuDF / RAPIDS | GPU-accelerated DataFrame processing, drop-in pandas replacement | Milliseconds | Single-node GPU |
dbt Fundamentals
dbt (data build tool) is a SQL-first transformation framework. The core insight: every dbt model is a SELECT statement. dbt handles the DDL (CREATE TABLE, CREATE VIEW) so you only write the logic. dbt compiles your SQL, resolves dependencies via a DAG, and runs models in the right order.
How dbt works
# You write this:
# models/marts/orders.sql → SELECT ... FROM {{ ref('stg_orders') }}
#
# dbt compiles to:
# CREATE TABLE analytics.orders AS SELECT ... FROM analytics.stg_orders
#
# dbt handles:
# - Dependency resolution (DAG)
# - Materialization (view vs table vs incremental)
# - Schema management
# - Documentation generation
# - Testing
The ref() function — dbt's killer feature
The ref() function is how models reference each other. It resolves to the correct schema at compile time and declares the dependency so dbt builds the DAG automatically.
-- models/staging/stg_orders.sql
-- Raw → cleaned, renamed, typed
SELECT
id AS order_id,
customer_id,
CAST(created_at AS TIMESTAMP) AS ordered_at,
status,
total_amount_cents / 100.0 AS total_usd,
-- normalize status to lowercase
LOWER(status) AS order_status
FROM {{ source('raw', 'orders') }}
WHERE id IS NOT NULL
-- models/intermediate/int_orders_with_customers.sql
-- Join cleaned orders with cleaned customers
SELECT
o.order_id,
o.ordered_at,
o.order_status,
o.total_usd,
c.customer_name,
c.customer_email,
c.country_code
FROM {{ ref('stg_orders') }} AS o
JOIN {{ ref('stg_customers') }} AS c ON o.customer_id = c.customer_id
-- models/marts/fct_orders.sql
-- Final fact table consumed by BI tools
SELECT
order_id,
ordered_at,
DATE_TRUNC('month', ordered_at) AS order_month,
customer_name,
country_code,
order_status,
total_usd,
-- derived columns
CASE
WHEN total_usd >= 500 THEN 'high'
WHEN total_usd >= 100 THEN 'medium'
ELSE 'low'
END AS order_tier
FROM {{ ref('int_orders_with_customers') }}
WHERE order_status NOT IN ('cancelled', 'test')
Sources — declaring external data
# models/staging/sources.yml
version: 2
sources:
- name: raw
schema: raw_data
description: Raw tables loaded by Fivetran
tables:
- name: orders
description: Order events from the production database
loaded_at_field: _fivetran_synced
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: customers
description: Customer records
dbt Project Structure
my_dbt_project/
├── dbt_project.yml # Project config: name, version, model paths, vars
├── profiles.yml # Connection credentials (usually in ~/.dbt/)
├── packages.yml # Third-party packages (dbt_utils, dbt_expectations)
├── models/
│ ├── staging/ # 1-to-1 with raw sources, minimal transforms
│ │ ├── sources.yml # Source declarations + freshness tests
│ │ ├── stg_orders.sql
│ │ └── stg_customers.sql
│ ├── intermediate/ # Business logic, joins — not exposed to BI
│ │ └── int_orders_with_customers.sql
│ └── marts/ # Wide, denormalized, BI-ready
│ ├── core/
│ │ ├── fct_orders.sql
│ │ └── dim_customers.sql
│ └── marketing/
│ └── mrt_customer_ltv.sql
├── seeds/ # Static CSV files loaded as tables
│ └── country_codes.csv
├── snapshots/ # SCD Type 2 history tracking
│ └── orders_snapshot.sql
├── macros/ # Reusable Jinja macros
│ ├── cents_to_dollars.sql
│ └── date_spine.sql
├── tests/ # Custom singular tests
│ └── assert_positive_revenue.sql
└── analyses/ # Ad-hoc SQL (not run in production)
└── cohort_exploration.sql
# dbt_project.yml
name: my_company
version: '1.0.0'
config-version: 2
profile: my_company # matches profiles.yml
model-paths: ["models"]
seed-paths: ["seeds"]
snapshot-paths: ["snapshots"]
macro-paths: ["macros"]
# Default materialization per layer
models:
my_company:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral # no physical table; inlined as CTE
marts:
+materialized: table
+schema: marts
core:
fct_orders:
+materialized: incremental
+unique_key: order_id
dbt Models & Materializations
dbt supports four materializations. Each represents a different trade-off between query cost, build time, and freshness.
| Materialization | SQL Created | Rebuild Cost | Query Cost | Use When |
|---|---|---|---|---|
view | CREATE VIEW | Near zero | High (runs every query) | Staging, light transforms |
table | CREATE TABLE AS SELECT | Full rebuild every run | Low (pre-materialized) | Final marts, medium data |
incremental | INSERT / MERGE into existing table | Only new rows | Low | Large tables, event data |
ephemeral | None (inlined as CTE) | Zero | Runs in context of referencing model | Intermediate logic, DRY helpers |
Incremental models — the production workhorse
-- models/marts/fct_page_views.sql
-- Incremental model: only processes new events each run
{{
config(
materialized='incremental',
unique_key='page_view_id',
incremental_strategy='merge', -- or 'append', 'delete+insert'
on_schema_change='sync_all_columns'
)
}}
SELECT
page_view_id,
session_id,
user_id,
page_url,
referrer_url,
viewed_at,
time_on_page_seconds,
-- only compute expensive UDFs on new rows
{{ classify_page_type('page_url') }} AS page_category
FROM {{ ref('stg_page_views') }}
{% if is_incremental() %}
-- Only process rows newer than the latest we've already loaded.
-- Use a small lookback buffer (1 hour) for late-arriving events.
-- Snowflake/Redshift syntax (BigQuery: DATETIME_SUB, DuckDB: interval arithmetic):
-- BigQuery: DATETIME_SUB(MAX(viewed_at), INTERVAL 1 HOUR)
-- DuckDB: MAX(viewed_at) - INTERVAL '1 hour'
WHERE viewed_at >= (
SELECT DATEADD(hour, -1, MAX(viewed_at)) FROM {{ this }}
)
{% endif %}
- Late-arriving data: Events arrive hours after the fact (mobile apps going offline). Always include a lookback buffer (1–24 hours) in your
is_incremental()filter. - Schema changes: Adding a column to an incremental model without
on_schema_change='sync_all_columns'causes silent data gaps — old rows lack the new column. - Unique key violations: Without
incremental_strategy='merge'and aunique_key, re-processing overlapping windows creates duplicates. - Full refresh needed: Run
dbt run --full-refreshafter backfilling source data or changing business logic that affects historical rows.
Config blocks — SQL vs YAML
-- Option A: config block in SQL (per-model override)
{{ config(materialized='table', tags=['daily', 'finance']) }}
SELECT * FROM {{ ref('int_revenue') }}
# Option B: schema.yml (preferred for shared config)
models:
- name: fct_orders
config:
materialized: table
tags: [daily, finance]
description: "Core orders fact table"
dbt Testing & Documentation
Schema tests (generic tests)
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: "Order events with customer attributes, one row per order"
columns:
- name: order_id
description: "Primary key — unique across all time"
tests:
- unique
- not_null
- name: order_status
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_usd
tests:
- not_null
# dbt_utils.accepted_range does not exist — use dbt_expectations or expression_is_true
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100000
Singular tests — custom SQL assertions
-- tests/assert_no_negative_revenue.sql
-- This test FAILS if any rows are returned.
-- dbt treats returned rows as test failures.
SELECT
order_id,
total_usd,
ordered_at
FROM {{ ref('fct_orders') }}
WHERE total_usd < 0
AND order_status = 'delivered'
-- tests/assert_orders_before_customers_deleted.sql
-- Referential integrity: no orders for deleted customers
SELECT
o.order_id,
o.customer_id
FROM {{ ref('fct_orders') }} AS o
LEFT JOIN {{ ref('dim_customers') }} AS c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
Documentation and data lineage
# Generate docs and serve locally
dbt docs generate # writes catalog.json + manifest.json to target/
dbt docs serve # launches http://localhost:8080
# The docs site shows:
# - Full DAG with source → staging → intermediate → marts
# - Column-level descriptions
# - Test status
# - Row counts and last-built timestamps
Data contracts (dbt 1.5+)
# Enforce schema contracts on models consumed by other teams
models:
- name: fct_orders
config:
contract:
enforced: true # dbt will fail if actual schema differs
columns:
- name: order_id
data_type: varchar
constraints:
- type: not_null
- type: primary_key
- name: total_usd
data_type: numeric
constraints:
- type: not_null
dbt Advanced
Jinja macros
Macros are reusable Jinja2 functions. They reduce repetition across models and are stored in the macros/ directory.
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/safe_divide.sql
{% macro safe_divide(numerator, denominator) %}
CASE
WHEN {{ denominator }} = 0 OR {{ denominator }} IS NULL THEN NULL
ELSE {{ numerator }}::float / {{ denominator }}
END
{% endmacro %}
-- macros/date_trunc_to_week.sql (cross-database compatible)
{% macro week_start(date_col) %}
{% if target.type == 'snowflake' %}
DATE_TRUNC('week', {{ date_col }})
{% elif target.type == 'bigquery' %}
DATE_TRUNC({{ date_col }}, WEEK(MONDAY))
{% elif target.type == 'redshift' %}
DATE_TRUNC('week', {{ date_col }})
{% else %}
DATE_TRUNC('week', {{ date_col }})
{% endif %}
{% endmacro %}
-- Using macros in a model
SELECT
order_id,
{{ cents_to_dollars('total_amount_cents') }} AS total_usd,
{{ safe_divide('revenue_cents', 'item_count') }} AS avg_item_revenue,
{{ week_start('ordered_at') }} AS order_week
FROM {{ source('raw', 'orders') }}
dbt compile to see the generated SQL. Rule of thumb: if a macro is used in only one model, inline the logic instead. Reserve macros for cross-database compatibility shims and patterns repeated in 5+ models.
Snapshots — SCD Type 2
-- snapshots/customers_snapshot.sql
-- Tracks changes to the customers table over time.
-- Creates dbt_scd_id, dbt_updated_at, dbt_valid_from, dbt_valid_to columns.
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp', -- or 'check' for tables without updated_at
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
SELECT
customer_id,
customer_name,
email,
plan_tier,
country_code,
updated_at
FROM {{ source('raw', 'customers') }}
{% endsnapshot %}
-- Query snapshot: what was each customer's plan on 2025-01-01?
SELECT
customer_id,
customer_name,
plan_tier
FROM {{ ref('customers_snapshot') }}
WHERE dbt_valid_from <= '2025-01-01'
AND (dbt_valid_to > '2025-01-01' OR dbt_valid_to IS NULL)
Useful packages
# packages.yml
packages:
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<2.0.0"]
- package: calogica/dbt_expectations
version: [">=0.9.0", "<1.0.0"]
- package: dbt-labs/audit_helper
version: [">=0.9.0"]
-- dbt_utils examples
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} AS sk
-- Modern dbt (1.1+): use {{ dbt.date_spine(...) }} instead of the dbt_utils version
{{ dbt_utils.date_spine(datepart='day', start_date="'2024-01-01'", end_date="current_date") }}
-- dbt_utils.pivot must appear inside a SELECT (it expands to a list of expressions)
SELECT
{{ dbt_utils.pivot('device_type', ['mobile','desktop','tablet']) }}
FROM {{ ref('sessions') }}
# dbt_expectations examples (in schema.yml)
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 1000
dbt in Production
CI/CD with dbt build
# dbt build = compile + run + test + snapshot, in dependency order
dbt build
# Slim CI: only run models changed in this PR + their downstream dependents
# Requires a production manifest (manifest.json) from the last successful run
dbt build \
--select state:modified+ \ # changed models and everything downstream
--defer \ # use production results for unmodified upstream models
--state path/to/prod-manifest/
# Target schema for CI to avoid polluting production
dbt build --target ci --vars '{"is_ci": true}'
# .github/workflows/dbt-ci.yml
name: dbt CI
on:
pull_request:
paths:
- 'models/**'
- 'tests/**'
- 'macros/**'
- 'dbt_project.yml'
jobs:
dbt-slim-ci:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download production manifest
run: aws s3 cp s3://my-bucket/dbt-artifacts/manifest.json ./prod-manifest/manifest.json
- name: Install dbt
run: pip install dbt-snowflake
- name: dbt build (slim CI)
env:
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
run: |
dbt deps
dbt build \
--select state:modified+ \
--defer \
--state ./prod-manifest/ \
--target ci
dbt Cloud vs dbt Core
| Feature | dbt Core (open source) | dbt Cloud |
|---|---|---|
| Scheduled runs | External scheduler (Airflow, Prefect) | Built-in job scheduler |
| Slim CI | Manual setup (S3 artifact, GitHub Actions) | Native CI/CD integration |
| IDE | Your editor + CLI | Web-based SQL IDE |
| Docs hosting | Self-host | Hosted (shareable link) |
| Semantic Layer | CLI + MetricFlow | Fully integrated |
| Orchestration | Airflow, Prefect, Dagster | Built-in |
| Cost | Free | $100–$200+/dev/month |
Freshness tests
# Check that sources aren't stale before running models
dbt source freshness
# In sources.yml (already shown above):
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
# In dbt_project.yml, run freshness as part of dbt build:
# dbt build --select source:raw+ # includes freshness checks
Spark for Data Transform
Spark's DataFrame API lets you express transforms as a directed acyclic graph of operations that Spark's Catalyst optimizer compiles into efficient physical plans. The same logic can be expressed as DataFrame API calls or SQL — choose based on readability.
DataFrame API vs Spark SQL — side by side
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("transforms").getOrCreate()
orders = spark.table("raw.orders")
customers = spark.table("raw.customers")
# ── DataFrame API ──────────────────────────────────────────────────────────
result_df = (
orders
.filter(F.col("status") != "test")
.join(customers, on="customer_id", how="inner")
.withColumn("total_usd", F.col("total_cents") / 100.0)
.withColumn("order_month", F.date_trunc("month", F.col("ordered_at")))
.withColumn(
"order_tier",
F.when(F.col("total_usd") >= 500, "high")
.when(F.col("total_usd") >= 100, "medium")
.otherwise("low")
)
.groupBy("order_month", "country_code", "order_tier")
.agg(
F.count("order_id").alias("order_count"),
F.sum("total_usd").alias("revenue_usd"),
F.countDistinct("customer_id").alias("unique_customers")
)
)
# ── Spark SQL (identical output) ───────────────────────────────────────────
orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")
result_sql = spark.sql("""
SELECT
DATE_TRUNC('month', o.ordered_at) AS order_month,
c.country_code,
CASE
WHEN o.total_cents / 100.0 >= 500 THEN 'high'
WHEN o.total_cents / 100.0 >= 100 THEN 'medium'
ELSE 'low'
END AS order_tier,
COUNT(o.order_id) AS order_count,
SUM(o.total_cents / 100.0) AS revenue_usd,
COUNT(DISTINCT o.customer_id) AS unique_customers
FROM orders o
JOIN customers c USING (customer_id)
WHERE o.status != 'test'
GROUP BY 1, 2, 3
""")
Common transform patterns
Deduplication
from pyspark.sql.window import Window
# Keep the most recent record per entity
w = Window.partitionBy("customer_id").orderBy(F.desc("updated_at"))
deduped = (
customers
.withColumn("row_num", F.row_number().over(w))
.filter(F.col("row_num") == 1)
.drop("row_num")
)
SCD Type 2 merge pattern
from delta.tables import DeltaTable
# Merge incoming changes into a Delta Lake table (SCD Type 2 simplification)
delta_table = DeltaTable.forName(spark, "gold.customers")
delta_table.alias("target").merge(
source=incoming_df.alias("source"),
condition="target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
"customer_name": "source.customer_name",
"plan_tier": "source.plan_tier",
"updated_at": "source.updated_at"
}).whenNotMatchedInsert(values={
"customer_id": "source.customer_id",
"customer_name": "source.customer_name",
"plan_tier": "source.plan_tier",
"created_at": "source.created_at",
"updated_at": "source.updated_at"
}).execute()
Sessionization
# Group events into sessions (30-minute gap = new session)
SESSION_TIMEOUT_SECS = 1800
w_user = Window.partitionBy("user_id").orderBy("event_time")
sessionized = (
events
.withColumn("prev_event_time", F.lag("event_time").over(w_user))
.withColumn(
"is_new_session",
F.when(
F.col("prev_event_time").isNull() |
(F.unix_timestamp("event_time") - F.unix_timestamp("prev_event_time") > SESSION_TIMEOUT_SECS),
1
).otherwise(0)
)
.withColumn("session_id_raw", F.sum("is_new_session").over(w_user))
.withColumn(
"session_id",
F.concat_ws("_", F.col("user_id"), F.col("session_id_raw").cast("string"))
)
)
Incremental processing with foreachBatch
# Read from Kafka, transform, write to Delta Lake — micro-batch streaming
from pyspark.sql.types import StructType, StringType, LongType
schema = StructType().add("order_id", StringType()).add("total_cents", LongType())
stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders")
.load()
)
def process_batch(batch_df, batch_id):
parsed = batch_df.selectExpr("CAST(value AS STRING) as json") \
.select(F.from_json("json", schema).alias("data")) \
.select("data.*") \
.withColumn("total_usd", F.col("total_cents") / 100.0)
parsed.write.format("delta").mode("append").saveAsTable("silver.orders")
query = (
stream.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", "/checkpoints/orders")
.trigger(processingTime="30 seconds")
.start()
)
query.awaitTermination()
Apache Flink Fundamentals
Flink is a distributed dataflow graph executor. Everything reduces to:
Stream → Operator → Stream → Operator → Stream
# Like Unix pipes, but distributed and stateful.
The key distinction from Spark: Flink is stream-first. Batch is a special case of a bounded stream. Flink processes one event at a time with true per-event latency, not micro-batches.
Key abstractions — bottom up
| Abstraction | Description | Analogy |
|---|---|---|
| Record | A single event (key + value + timestamp) | A single log line |
| Stream | Unbounded sequence of records | A Unix pipe |
| Operator | Transform applied to a stream | A grep or awk command |
| KeyedStream | Stream partitioned by key (like GROUP BY) | Multiple parallel pipes, one per key |
| State | Memory local to an operator, persisted and fault-tolerant | A variable in an awk script that persists across lines |
Core operators
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.*;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Modern KafkaSource requires fromSource() — not the legacy addSource() API
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("orders")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> rawStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<OrderEvent> orders = rawStream
.map(json -> parseOrder(json)) // deserialize JSON → OrderEvent
// map: 1-to-1 transform
.map(order -> {
order.totalUsd = order.totalCents / 100.0;
return order;
})
// filter: 1-to-0 or 1-to-1 (keep or drop)
.filter(order -> order.status != OrderStatus.TEST)
// flatMap: 1-to-N (one input produces zero or more outputs)
.flatMap((OrderEvent order, Collector<LineItem> out) -> {
for (LineItem item : order.lineItems) {
out.collect(item);
}
})
// keyBy: partition by key — subsequent operators see only their key's events
.keyBy(order -> order.customerId)
// window: group events in time
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
// aggregate within window
.aggregate(new OrderCountAggregator())
// process: full control, access to state + timers
.process(new FraudDetectionProcessor());
// Execute — Flink does nothing until execute() is called
env.execute("Order Processing Job");
Bounded vs unbounded streams
// Unbounded (streaming) — runs forever
// FlinkKafkaConsumer was the legacy API (removed in Flink 1.17).
// Use KafkaSource + fromSource() for all new code:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("orders")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Bounded (batch mode) — finite input, runs to completion
// In Flink 1.12+, DataStream API works for both
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// env.readFile() was deprecated and removed. Use FileSource + fromSource():
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/orders/"))
.build();
DataStream<String> bounded = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "File Source");
// STREAMING mode: event-by-event, true streaming semantics
// BATCH mode: optimized for throughput, blocking operators allowed
Flink Windows
Windows are how Flink groups an infinite stream into finite buckets for aggregation. Choosing the right window type determines the latency, memory cost, and business semantics of your computation.
Tumbling windows — fixed, non-overlapping
# Events split into non-overlapping fixed intervals
# Each event belongs to exactly one window
|--- hour 1 ---|--- hour 2 ---|--- hour 3 ---|
e1 e2 e3 e4 e5 e6 e7
→ window 1 → window 2 → window 3
// Count orders per 1-hour tumbling window, per customer
orders
.keyBy(order -> order.customerId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.aggregate(new OrderSumAggregator(), new WindowMetadataFunction())
.print();
Sliding windows — fixed size, overlapping
# Window size = 1 hour, slide = 15 minutes
# Each event appears in multiple windows (4 windows for 1h/15m ratio)
time: --0:00--0:15--0:30--0:45--1:00--1:15--1:30--
[window 1: 0:00 – 1:00]
[window 2: 0:15 – 1:15]
[window 3: 0:30 – 1:30]
// Rolling 1-hour revenue, updated every 15 minutes
orders
.keyBy(order -> order.country)
.window(SlidingEventTimeWindows.of(
Duration.ofHours(1), // window size
Duration.ofMinutes(15) // slide interval
))
.aggregate(new RevenueAggregator())
.print();
Session windows — gap-based, dynamic size
# Window ends when there's a gap > 30 minutes with no events
# Window size varies per user session
user A: [click]---[click]--[click] [click][click]
<30min gap <30min gap
[--- session 1 ----------] >30min gap [-- session 2 --]
// User sessions with 30-minute inactivity gap
pageViews
.keyBy(view -> view.userId)
.window(EventTimeSessionWindows.withGap(Duration.ofMinutes(30)))
.aggregate(new SessionAggregator())
.print();
Global windows — custom trigger
// Global window: all events for a key in one window
// Requires a custom trigger to fire
orders
.keyBy(order -> order.customerId)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100)) // fire every 100 events
.aggregate(new OrderBatchAggregator())
.print();
Window summary
| Type | Overlap? | Size | Use Case |
|---|---|---|---|
| Tumbling | No | Fixed | Hourly/daily reporting, billing periods |
| Sliding | Yes | Fixed | Rolling averages, moving metrics |
| Session | No | Dynamic (gap-based) | User sessions, burst detection |
| Global | No | Unbounded | Custom triggers, count-based batching |
Flink State & Checkpoints
Keyed state — per-key storage
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
// A stateful function that detects when a customer's 5-order total exceeds $1000
public class HighValueDetector extends KeyedProcessFunction<String, OrderEvent, Alert> {
// State is automatically scoped to each key (customerId)
private ValueState<Double> totalSpentState;
private ValueState<Integer> orderCountState;
private ListState<Long> recentOrderTimestamps;
private MapState<String, Double> ordersByProduct;
@Override
public void open(Configuration parameters) throws Exception {
totalSpentState = getRuntimeContext().getState(
new ValueStateDescriptor<>("totalSpent", Double.class)
);
orderCountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("orderCount", Integer.class)
);
recentOrderTimestamps = getRuntimeContext().getListState(
new ListStateDescriptor<>("timestamps", Long.class)
);
ordersByProduct = getRuntimeContext().getMapState(
new MapStateDescriptor<>("byProduct", String.class, Double.class)
);
}
@Override
public void processElement(OrderEvent order, Context ctx, Collector<Alert> out) throws Exception {
// Read current state (null if first event for this key)
Double currentTotal = totalSpentState.value();
if (currentTotal == null) currentTotal = 0.0;
Integer currentCount = orderCountState.value();
if (currentCount == null) currentCount = 0;
// Update state
double newTotal = currentTotal + order.totalUsd;
int newCount = currentCount + 1;
totalSpentState.update(newTotal);
orderCountState.update(newCount);
recentOrderTimestamps.add(order.timestampMs);
ordersByProduct.put(order.productCategory, order.totalUsd);
// Register a timer to clear state after 24 hours of inactivity
ctx.timerService().registerEventTimeTimer(order.timestampMs + 86_400_000L);
// Emit alert if threshold crossed
if (newTotal >= 1000.0 && newCount >= 5) {
out.collect(new Alert(order.customerId, newTotal, newCount));
totalSpentState.clear();
orderCountState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
// Clear state for idle customers
totalSpentState.clear();
orderCountState.clear();
recentOrderTimestamps.clear();
ordersByProduct.clear();
}
}
State backends
| Backend | Storage | State Size | Read Latency | Use When |
|---|---|---|---|---|
| HashMapStateBackend | JVM heap | < 1 GB per TM | Microseconds | Small state, low-latency jobs |
| EmbeddedRocksDBStateBackend | RocksDB on local disk | Hundreds of GB | Milliseconds (SSD) | Large state (sessions, joins) |
// Configure RocksDB state backend with incremental checkpoints
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental
// enableCheckpointing() sets the interval AND the mode — setCheckpointInterval()
// does not exist on CheckpointConfig; the interval belongs on the env call.
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE); // every 60s
CheckpointConfig cpConfig = env.getCheckpointConfig();
cpConfig.setCheckpointTimeout(300_000); // fail if > 5min
cpConfig.setMinPauseBetweenCheckpoints(5000);
cpConfig.setMaxConcurrentCheckpoints(1);
cpConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
How checkpointing works — Chandy-Lamport algorithm
# Checkpoint barriers flow through the stream like special records.
# When an operator receives a barrier, it:
# 1. Pauses processing
# 2. Snapshots its state to durable storage (S3, HDFS)
# 3. Forwards the barrier downstream
# 4. Resumes processing
#
# When all operators have snapshotted, the checkpoint is complete.
# On failure, Flink resets to the last complete checkpoint.
#
# Source → map → keyBy → window → sink
# | | | | |
# barrier → barrier → barrier → barrier → ACK to JobManager
#
# Exactly-once = checkpoint + transactional sink (2PC)
# At-least-once = checkpoint without aligned barriers (faster, may replay)
Savepoints vs checkpoints
| Checkpoint | Savepoint | |
|---|---|---|
| Purpose | Automatic fault recovery | Manual migration, upgrade, A/B test |
| Trigger | Automatic (periodic) | Manual (flink savepoint <jobId>) |
| Lifecycle | Deleted on new checkpoint or job cancel | Persists until manually deleted |
| Operator state alignment | Not guaranteed across versions | Stable, supports code changes |
Flink Cluster Architecture
+------------------+
| JobManager |
| (the brain) |
+--------+---------+
| assigns tasks
+----------------+----------------+
v v v
+---------------+ +---------------+ +---------------+
| TaskManager 1 | | TaskManager 2 | | TaskManager 3 |
| (worker) | | (worker) | | (worker) |
| Slot 1: map | | Slot 1: map | | Slot 1: window|
| Slot 2: filter| | Slot 2: sink | | Slot 2: agg |
+---------------+ +---------------+ +---------------+
JobManager responsibilities
- Scheduler: Decides which TaskManager slot gets which operator task
- Checkpoint Coordinator: Triggers checkpoint barriers, tracks completion, manages savepoints
- Resource Manager: Requests and releases TaskManager slots (integrates with YARN, K8s)
- Dispatcher: Receives job submissions, spawns a JobMaster per job
TaskManager responsibilities
- Task Slots: Unit of parallelism. One slot = one thread. A TM with 4 slots can run 4 parallel tasks.
- Network buffers: In-memory buffers for data exchange between operators (credit-based flow control)
- State backend: Manages local state (heap or RocksDB), writes checkpoints to remote storage
Graph compilation
# User code → execution plan in 3 stages:
#
# 1. StreamGraph: logical operator DAG (from user code)
# Source → Map → Filter → KeyBy → Window → Aggregate → Sink
#
# 2. JobGraph: physical plan with operator chaining applied
# Operators that can run in the same thread are chained together.
# Source+Map+Filter are often chained into one "operator chain".
# Reduces serialization overhead between operators.
#
# 3. ExecutionGraph: parallel execution plan
# Each operator is expanded to parallelism P copies.
# ExecutionVertex = one parallel instance of one operator.
# Each ExecutionVertex is assigned to a TaskManager slot.
# Operator chaining rules:
# - Same parallelism
# - No shuffle between them (not keyBy, not rebalance)
# - Both are chainable (map, filter, flatMap — yes; window — no)
# Disable chaining for debugging:
env.disableOperatorChaining();
Flink Backpressure
Backpressure occurs when a downstream operator is slower than upstream. Flink uses credit-based flow control: a downstream TaskManager grants credits to upstream, allowing upstream to send only as many network buffers as the downstream has capacity for. No data is dropped — the pipeline slows end-to-end.
Credit-based flow control
# Normal flow (downstream keeping up):
#
# Source ──[10 msg/s]──► Map ──[10 msg/s]──► Window ──[10 msg/s]──► Sink
# Credits: Map grants 10 credits to Source → Source sends freely
# Backpressure (sink is slow):
#
# Source ──[10 msg/s]──► Map ──[10 msg/s]──► Window ──[2 msg/s]──► Sink (slow!)
# ↑
# Window buffers fill up
# Map buffers fill up
# Source slows to 2 msg/s
#
# Result: no data loss, but end-to-end latency increases
Diagnosing backpressure via Flink UI
# Flink Web UI: localhost:8081
# Job → Subtasks → Backpressure tab
#
# Colors:
# OK (green) = <10% backpressure ratio
# LOW (yellow) = 10-50%
# HIGH (red) = >50% ← investigate immediately
#
# REST API alternative:
curl http://localhost:8081/jobs/{jobId}/vertices/{vertexId}/backpressure
Common causes and fixes
| Cause | Symptom | Fix |
|---|---|---|
| Slow sink (DB write) | Sink vertex is HIGH | Batch writes, async sink, increase sink parallelism |
| Expensive computation | CPU-bound operator is HIGH | Increase parallelism, optimize UDF, offload to external service |
| Key skew | One subtask is HIGH, others are OK | Add salt to key, redistribute with rebalance(), custom partitioner |
| GC pressure | Intermittent spikes, HIGH during GC | Switch to RocksDB backend, tune GC, reduce object creation in hot path |
| Large state serialization | Checkpoint coordinator shows slow snapshots | Use incremental RocksDB checkpoints, reduce state TTL |
Flink SQL & Table API
Flink SQL lets you process streams declaratively. The key abstraction: a dynamic table — a table that changes over time as new events arrive. A SQL query over a dynamic table produces another dynamic table.
Continuous queries on streams
import org.apache.flink.table.api.*;
// Note: KafkaSource (DataStream connector) is not used with the Table API.
// Kafka tables are declared via SQL DDL (CREATE TABLE ... WITH ('connector'='kafka',...))
// not via Java connector objects — no KafkaSource import needed here.
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// Declare a Kafka source as a dynamic table
tEnv.executeSql("""
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
product_id STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""");
// Tumbling window aggregation — SQL GROUP BY TUMBLE()
tEnv.executeSql("""
SELECT
customer_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue
FROM orders
GROUP BY customer_id, TUMBLE(order_time, INTERVAL '1' HOUR)
""").print();
Temporal joins — join stream with versioned table
-- Join orders with the exchange rate that was valid at order time
-- exchange_rates is a versioned table (tracks historical rates)
SELECT
o.order_id,
o.amount,
o.currency,
o.amount * r.rate_to_usd AS amount_usd,
o.order_time
FROM orders AS o
JOIN exchange_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency_code
Pattern detection with MATCH_RECOGNIZE
-- Detect failed login followed by successful login within 5 minutes (brute-force pattern)
SELECT *
FROM login_events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(FAILED.event_time) AS first_failure_time,
LAST(FAILED.event_time) AS last_failure_time,
SUCCESS.event_time AS success_time,
COUNT(FAILED.event_id) AS failure_count
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (FAILED{3,} SUCCESS) -- 3+ failures followed by success
WITHIN INTERVAL '5' MINUTE
DEFINE
FAILED AS event_type = 'LOGIN_FAILED',
SUCCESS AS event_type = 'LOGIN_SUCCESS'
)
Catalog integration
// Register Hive Metastore catalog — use existing Hive tables in Flink SQL
HiveCatalog hiveCatalog = new HiveCatalog(
"hive_catalog",
"default", // default database
"/etc/hive/conf" // hive-site.xml location
);
tEnv.registerCatalog("hive_catalog", hiveCatalog);
tEnv.useCatalog("hive_catalog");
// Now you can JOIN Kafka stream with Hive tables
tEnv.executeSql("""
SELECT o.order_id, p.product_name, o.amount
FROM kafka_catalog.default.orders AS o
JOIN hive_catalog.default.products AS p ON o.product_id = p.product_id
""");
GPU-Accelerated Transform: NVIDIA cuDF & RAPIDS New Frontier
cuDF is NVIDIA's GPU-accelerated DataFrame library built on CUDA and Apache Arrow. It implements the pandas API, so existing pandas code can leverage 10–150x speedups with minimal or zero code changes. For a deeper dive into GPU hardware, the CUDA programming model, and NVIDIA's full stack, see the Accelerated Computing refresher.
cudf.pandas — the zero-code-change breakthrough
The most important feature: cudf.pandas acts as a transparent proxy. Import it once, and your existing pandas code runs on GPU automatically. Operations not supported by cuDF silently fall back to CPU pandas.
##############################################################
# Option A: Jupyter / IPython magic (zero code change)
##############################################################
%load_ext cudf.pandas
import pandas as pd # This is now GPU-accelerated!
df = pd.read_parquet("orders.parquet") # GPU read
result = (
df[df["status"] == "delivered"]
.groupby("customer_id")["total_usd"]
.agg(["sum", "count", "mean"])
.sort_values("sum", ascending=False)
.head(100)
)
# No other changes needed. cuDF handles it transparently.
##############################################################
# Option B: Script / production code (one import line)
##############################################################
import cudf.pandas
cudf.pandas.install() # patches pandas in-process — not a pip install
import pandas as pd # Now GPU-accelerated
# Same exact pandas code as above — runs on GPU
df = pd.read_parquet("orders_large.parquet")
result = df.groupby(["country", "month"])["revenue"].sum()
##############################################################
# Option C: Explicit cuDF API (maximum control)
##############################################################
import cudf
df = cudf.read_parquet("orders.parquet") # GPU memory
filtered = df[df["total_usd"] > 100]
joined = filtered.merge(customers, on="customer_id", how="left")
result = (
joined
.groupby(["country_code", "order_tier"])
.agg({"total_usd": ["sum", "mean", "count"]})
)
# Convert back to pandas for non-GPU-aware downstream
pandas_result = result.to_pandas()
Performance benchmarks
| Workload | pandas (CPU) | cuDF (GPU) | Speedup |
|---|---|---|---|
| GroupBy + agg on 1GB CSV | 45s | 0.8s | ~56x |
| Merge/join on 2GB parquet | 120s | 2.1s | ~57x |
| Rolling window (1M rows) | 8s | 0.12s | ~67x |
| String operations (regex) | 30s | 1.8s | ~17x |
| Spark RAPIDS vs Spark CPU | Spark baseline | +5x faster | 5x, 10x TCO |
The RAPIDS ecosystem
| Library | Replaces | Reported Speedup | Use Case |
|---|---|---|---|
| cuDF | pandas | 10–150x | DataFrames, ETL preprocessing |
| cuML | scikit-learn | 27x | ML training and inference |
| cuGraph | NetworkX | 487x | Graph analytics, fraud networks |
| cuVS | FAISS (CPU) | 10–100x | Vector search, ANN index |
| Spark RAPIDS plugin | Spark CPU | 5x | Accelerate existing Spark jobs |
Spark RAPIDS — accelerating existing Spark jobs
# Add the RAPIDS plugin to your Spark cluster — no code changes needed
spark-submit \
--jars rapids-4-spark_2.12-24.10.0.jar \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.sql.enabled=true \
--conf spark.rapids.memory.pinnedPool.size=4g \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.5 \
my_existing_spark_job.py
# Catalyst optimizer routes GPU-supported operations to cuDF
# CPU fallback for unsupported ops (no code change needed)
Same transform: pandas → cudf.pandas → Spark RAPIDS
##############################################################
# pandas (CPU baseline)
##############################################################
import pandas as pd
df = pd.read_parquet("events.parquet") # 5 GB file
result = (
df[df["event_type"] == "purchase"]
.assign(revenue=df["price"] * df["quantity"])
.groupby(["user_id", "product_category"])
.agg(total_revenue=("revenue", "sum"), order_count=("order_id", "count"))
.reset_index()
.sort_values("total_revenue", ascending=False)
)
result.to_parquet("output.parquet")
# Runtime on A100: ~180 seconds
##############################################################
# cudf.pandas (GPU, zero code change)
##############################################################
import cudf.pandas; cudf.pandas.install()
import pandas as pd # same import!
df = pd.read_parquet("events.parquet") # GPU read
result = (
df[df["event_type"] == "purchase"]
.assign(revenue=df["price"] * df["quantity"])
.groupby(["user_id", "product_category"])
.agg(total_revenue=("revenue", "sum"), order_count=("order_id", "count"))
.reset_index()
.sort_values("total_revenue", ascending=False)
)
result.to_parquet("output.parquet")
# Runtime on A100: ~2.1 seconds (~86x speedup)
##############################################################
# Spark RAPIDS (distributed GPU, same Spark code)
##############################################################
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder \
.config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
.config("spark.rapids.sql.enabled", "true") \
.getOrCreate()
df = spark.read.parquet("s3://bucket/events/") # multi-node GPU read
result = (
df.filter(F.col("event_type") == "purchase")
.withColumn("revenue", F.col("price") * F.col("quantity"))
.groupBy("user_id", "product_category")
.agg(
F.sum("revenue").alias("total_revenue"),
F.count("order_id").alias("order_count")
)
.orderBy(F.desc("total_revenue"))
)
result.write.parquet("s3://bucket/output/")
# Runtime: ~8 seconds on 4 A100 nodes (22x vs CPU Spark)
Polars GPU engine
# Polars 1.0+ supports GPU execution via cuDF engine.
# cuDF must be pre-installed from NVIDIA's package index BEFORE polars[gpu] will
# work at runtime — pip install polars[gpu] alone is not sufficient:
# pip install --extra-index-url https://pypi.nvidia.com cudf-cu12
# pip install polars[gpu]
import polars as pl
df = pl.scan_parquet("events.parquet") # lazy scan
result = (
df.filter(pl.col("event_type") == "purchase")
.with_columns((pl.col("price") * pl.col("quantity")).alias("revenue"))
.group_by(["user_id", "product_category"])
.agg([
pl.sum("revenue").alias("total_revenue"),
pl.count("order_id").alias("order_count")
])
.sort("total_revenue", descending=True)
.collect(engine="gpu") # <-- one flag to enable GPU
)
When NOT to use cuDF
- Data smaller than ~100 MB: GPU transfer overhead dominates. pandas on CPU is faster.
- Streaming / event-by-event: cuDF is batch-oriented. Use Flink for true streaming.
- Wide DataFrames with few rows: GPU parallelism needs depth (many rows), not width.
- Niche pandas features: cuDF covers ~93% of the pandas API. Operations like
df.pivot_table(observed=False)or certain MultiIndex operations fall back to CPU — check compatibility before committing to GPU. - GPU memory: An A100 has 40–80 GB. A dataset larger than GPU VRAM requires chunking or Dask-cuDF (distributed GPU).
Real-world production wins
- Nestlé: 5x faster ETL pipelines by swapping pandas for cuDF in their ML feature store
- Snap: 76% cost reduction on their Spark analytics clusters using the RAPIDS plugin
- PayPal: Fraud detection feature computation dropped from 4 hours to 20 minutes with Spark RAPIDS
- Walmart: Inventory optimization models: cuML replaced scikit-learn, training time 27x faster
Stream Processing Patterns
CDC with Debezium
Change Data Capture (CDC) captures row-level changes from a database transaction log and publishes them as events to Kafka. Flink or Spark can consume these events for real-time materialized views.
// Debezium event format (Kafka message value)
{
"before": { "order_id": "abc123", "status": "pending", "total_cents": 5000 },
"after": { "order_id": "abc123", "status": "shipped", "total_cents": 5000 },
"op": "u", // c=create, u=update, d=delete, r=read(snapshot)
"ts_ms": 1710000000000,
"source": { "db": "orders_db", "table": "orders" }
}
// Flink: consume Debezium CDC stream from Kafka
tEnv.executeSql("""
CREATE TABLE orders_cdc (
order_id STRING,
customer_id STRING,
status STRING,
total_cents BIGINT,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'debezium.orders_db.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
)
""");
// Flink treats the CDC stream as a dynamic table with upserts
// This query maintains a real-time materialized view of active orders
tEnv.executeSql("""
CREATE TABLE active_orders_view WITH ('connector' = 'jdbc', ...)
AS SELECT
customer_id,
COUNT(*) AS active_order_count,
SUM(total_cents) / 100.0 AS total_active_usd
FROM orders_cdc
WHERE status NOT IN ('delivered', 'cancelled')
GROUP BY customer_id
""");
Exactly-once end-to-end
// Flink + Kafka exactly-once:
// - Source: Kafka consumer with committed offsets
// - Processing: Flink checkpointing (Chandy-Lamport)
// - Sink: Kafka transactional producer (2-phase commit)
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 2PC
.setTransactionalIdPrefix("flink-exactly-once")
.build();
stream.sinkTo(sink);
Late data handling and watermarks
// Watermarks tell Flink "events older than this timestamp are now late"
// Flink waits for the watermark before closing a window
// Strategy 1: Bounded out-of-orderness (most common)
// Allow events up to 10 seconds late
WatermarkStrategy<OrderEvent> strategy = WatermarkStrategy
.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.timestampMs);
// Strategy 2: Custom (inspect each event to decide watermark)
WatermarkStrategy<OrderEvent> custom = WatermarkStrategy
.forGenerator(ctx -> new WatermarkGenerator<OrderEvent>() {
private long maxTimestamp = Long.MIN_VALUE;
@Override
public void onEvent(OrderEvent event, long ts, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, event.timestampMs);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - 10_000)); // 10s lag
}
});
// Handle truly late events after window closes
stream
.keyBy(e -> e.customerId)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
.allowedLateness(Duration.ofSeconds(30)) // refire window for late arrivals
.sideOutputLateData(lateTag) // capture stragglers
.aggregate(new SumAggregator());
The Big Comparison — dbt vs Spark vs Flink vs cuDF
| Dimension | dbt | Spark | Flink | cuDF / RAPIDS |
|---|---|---|---|---|
| Latency | Minutes (scheduled SQL) | Seconds–minutes (micro-batch) | Milliseconds (true streaming) | Milliseconds (single-node GPU) |
| Primary language | SQL + Jinja | Python, Scala, Java, R | Java, Scala, Python, SQL | Python (pandas API) |
| Scale | Warehouse-bound | Petabytes (distributed) | Petabytes/day (distributed) | Single-node GPU (Dask-cuDF for multi-node) |
| State management | None (stateless SQL) | Limited (foreachBatch) | First-class (ValueState, MapState, timers) | None (batch) |
| SQL support | Native (the whole interface) | SparkSQL, Catalyst optimizer | Flink SQL, Table API | None native (use pandas API) |
| GPU support | No | Via RAPIDS plugin | No (in development) | Native — built for GPU |
| Exactly-once | Not applicable (idempotent SQL) | Structured Streaming + Delta | Native (Chandy-Lamport + 2PC) | Not applicable (batch) |
| Learning curve | Low (SQL + YAML) | Medium | High (stateful streaming, watermarks) | Very low (it's pandas) |
| Ecosystem | Snowflake, BigQuery, Redshift, DuckDB | Hadoop, Delta, Iceberg, S3 | Kafka, Iceberg, JDBC, Elasticsearch | RAPIDS (cuML, cuGraph), Spark RAPIDS |
| Best for | Warehouse modeling, analytics engineering | Large-scale batch ETL, ML feature engineering | Real-time event processing, fraud detection, CDC | GPU-accelerated pandas workloads, ML preprocessing |
Decision flowchart
What are you building?
│
├── Transforming data inside a warehouse (Snowflake, BigQuery, Redshift)?
│ └── YES → dbt
│ ├── Simple SQL models → view materialization
│ ├── Large history tables → incremental materialization
│ └── SCD Type 2 → dbt snapshots
│
├── Processing data outside the warehouse?
│ │
│ ├── Data size < 10 GB and GPU available?
│ │ └── YES → cuDF / cudf.pandas (zero code change from pandas)
│ │
│ ├── Batch processing (> 10 GB, no strict latency SLA)?
│ │ └── YES → Spark
│ │ └── GPU available? → Add Spark RAPIDS for 5x speedup
│ │
│ ├── Real-time / streaming (per-event latency matters)?
│ │ └── YES → Flink
│ │ ├── Simple aggregations → Flink SQL
│ │ ├── Complex patterns / ML → DataStream API
│ │ └── CDC source → Flink + Debezium
│ │
│ └── Both batch AND stream over same data?
│ └── Flink (unified batch/stream) OR Spark Structured Streaming
│ → Use Delta Lake / Iceberg as the shared table format
Hybrid architectures in the wild
# Production architecture at a mid-size fintech:
#
# Raw Sources → Fivetran → Snowflake raw schema
# │
# ├── dbt → Snowflake marts → Tableau
# │
# Kafka (transactions) → Flink → Redis (fraud scores, <50ms)
# └── Iceberg (audit trail)
#
# Iceberg tables → Spark batch → ML feature store (Delta Lake)
# └── cuDF (GPU) on notebooks for exploration
#
# Rule: use the cheapest tool that satisfies latency + scale constraints
Convergence trends
- SQLMesh: Next-gen dbt alternative with native multi-dialect SQL, plan/apply workflow (like Terraform), and built-in Spark/Flink execution engines — the lines between tools are blurring.
- Materialized views: Snowflake Dynamic Tables and BigQuery Materialized Views bring near-real-time refresh to the warehouse, partially replacing Flink for simple aggregations.
- Flink SQL: Making stream processing accessible to SQL-first teams — the Table API is now as capable as the DataStream API for most use cases.
- dbt-flink-adapter: Run dbt models as Flink SQL jobs — early stage but directionally significant.
The Future of Data Transform
GPU-first processing — NVIDIA's ecosystem strategy
At GTC 2026, NVIDIA announced aggressive expansion of RAPIDS across the data ecosystem. The strategy: make GPU acceleration the default at every layer of the modern data stack.
- cuDF 25.x: Near-complete pandas compatibility (target 99%), automatic spill to CPU when GPU VRAM full
- Polars GPU engine: Ships as stable in Polars 2.0, handles multi-GPU via Dask-cuDF
- Snowflake Snowpark for Python: Evaluating GPU execution for UDFs on Snowpark clusters
- DuckDB GPU: Experimental CUDA backend in development — vectorized SQL on GPU
- Spark RAPIDS plugin: v25.x covers 95%+ of Spark SQL operations, near-zero configuration
SQL everywhere — the lingua franca
SQL is winning. Every tool has invested heavily in SQL interfaces:
- dbt — always SQL-first
- Spark SQL — Catalyst optimizer, ANSI SQL compliance in Spark 3.x
- Flink SQL — Table API as capable as DataStream API for most workloads
- DuckDB — in-process SQL, replaces pandas for many data science workflows
- MotherDuck — serverless DuckDB, "Snowflake for individuals"
The implication: SQL skills transfer across tools. The differentiation is in execution engine (GPU vs CPU vs distributed), latency semantics (batch vs streaming), and cost model.
Batch-stream unification
The historical gap between batch and streaming systems is closing:
- Flink 1.12+: DataStream API for bounded streams — same code, different runtime mode
- Spark Structured Streaming: Trigger.Once() for batch mode, Trigger.AvailableNow() for incremental
- Materialized views: Automatic incremental refresh from append-only source tables
- Apache Paimon: Streaming lakehouse table format designed for high-frequency upserts
Lakehouse effect — one table format for batch and stream
Apache Iceberg and Delta Lake are enabling the same physical data to serve both batch analytics and streaming queries:
# Write from Flink (streaming) and read from Spark (batch) — same Iceberg table
# Flink write:
tEnv.executeSql("""
CREATE TABLE orders_iceberg (order_id STRING, total DOUBLE, ts TIMESTAMP)
WITH ('connector'='iceberg', 'catalog-name'='hive', 'database-name'='gold')
""")
# Spark read (batch):
spark.read.format("iceberg").load("hive.gold.orders_iceberg")
Cost/performance frontier
| Approach | Hardware Cost | Throughput | Latency | Sweet Spot |
|---|---|---|---|---|
| DuckDB / Polars (CPU in-process) | Low (single machine) | High for single-node | Seconds | < 50 GB, ad-hoc analytics |
| cuDF (GPU single-node) | Medium (GPU server) | Very high | Milliseconds | 10 GB – 1 TB, ML pipelines |
| Spark (distributed CPU) | High (cluster) | Highest (scale-out) | Seconds–minutes | > 1 TB batch |
| Spark RAPIDS (distributed GPU) | Very high (GPU cluster) | Highest + GPU acceleration | Seconds | > 1 TB, time-sensitive batch |
| Flink (distributed streaming) | High (always-on cluster) | High (continuous) | Milliseconds | Real-time requirements |
Production Patterns
Monitoring by tool
dbt monitoring
# dbt freshness — are sources up to date?
dbt source freshness --output json | jq '.sources[] | select(.status != "pass")'
# dbt test on a schedule (Airflow example)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='dbt test --models marts.fct_orders --store-failures',
)
# Elementary — open-source dbt monitoring package
# Generates anomaly detection tests (row count, null rate, distribution)
# pip install elementary-data
elementary send-report --slack-token $SLACK_TOKEN --slack-channel "#data-alerts"
Flink monitoring
# Flink REST API metrics
curl http://localhost:8081/jobs/{jobId}/metrics?get=numRecordsInPerSecond,numRecordsOutPerSecond
# Key metrics to alert on:
# - numRecordsInPerSecond: source throughput
# - isBackPressured: backpressure indicator per vertex
# - lastCheckpointDuration: checkpoint time (should be << checkpoint interval)
# - numberOfFailedCheckpoints: should be 0
# - taskSlotsAvailable: cluster headroom
# Prometheus + Grafana (recommended setup)
# Add to flink-conf.yaml:
# metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
# metrics.reporter.prom.port: 9249
Spark monitoring
# Spark History Server: http://spark-master:18080
# Key metrics per job:
# - Stage duration vs shuffle read/write
# - Task skew: compare task durations in a stage
# - GC time: should be <10% of task time
# - Spill to disk: indicates insufficient memory
# Programmatic (for alerting)
from pyspark import SparkContext # SparkContext lives in pyspark, not pyspark.sql
sc = SparkContext.getOrCreate()
# Access StatusTracker for stage/job completion rates
Scaling patterns
| Tool | Scale Up | Scale Out | Key Bottlenecks |
|---|---|---|---|
| dbt | Warehouse compute credits (Snowflake XL warehouse) | Parallelism in dbt Cloud, slim CI to reduce scope | Shuffle-heavy SQL, very wide joins |
| Spark | Larger executor memory/cores | Add executors, optimize partitioning | Shuffle (reduce with broadcast joins, bucketing) |
| Flink | Increase task slot parallelism | Add TaskManagers, rescale checkpointed job | State backend I/O, key skew, slow sinks |
| cuDF | Larger GPU (A100 80GB) | Dask-cuDF (multi-GPU), multi-node | GPU VRAM — chunk data or use Dask |
Failure handling and recovery
# dbt: retry failed models
# dbt_project.yml
models:
my_company:
+on-run-end: "{{ store_results() }}"
# In Airflow DAG: retry failed dbt models only
dbt_run_failed = BashOperator(
task_id='dbt_run_retry',
bash_command='dbt run --models result:error+', # re-run errored + downstream
retries=2,
retry_delay=timedelta(minutes=5),
)
// Flink: configure restart strategy
env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(
3, // max 3 restarts
Time.of(10, TimeUnit.SECONDS) // wait 10s between attempts
)
);
// Or exponential backoff
env.setRestartStrategy(
RestartStrategies.exponentialDelayRestart(
Time.seconds(1), // initial delay
Time.minutes(2), // max delay
2.0, // multiplier
Time.minutes(10), // reset threshold (if job stable for 10 min)
0.1 // jitter factor
)
);
Data quality at scale
# Great Expectations — works with pandas, Spark, and SQL
import great_expectations as gx
context = gx.get_context()
ds = context.sources.add_spark("spark_source", spark=spark)
da = ds.add_spark_df_asset("orders_asset")
batch_request = da.build_batch_request(dataframe=orders_df)
validator = context.get_validator(batch_request=batch_request)
# Define expectations
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("total_usd", min_value=0, max_value=1e6)
validator.expect_column_values_to_be_in_set(
"status", ["pending", "confirmed", "shipped", "delivered", "cancelled"]
)
# Run and capture results
results = validator.validate()
if not results.success:
raise ValueError(f"Data quality check failed: {results.statistics}")
Cross-links to related refreshers
- Spark refresher — RDD internals, Catalyst optimizer, shuffle tuning, partitioning strategies, Structured Streaming deep dive
- Kafka refresher — Topics, partitions, consumer groups, Kafka Streams, exactly-once producer/consumer configuration
- Lakehouse refresher — Delta Lake and Apache Iceberg internals, ACID transactions, time travel, schema evolution
- Airflow refresher — DAG authoring, scheduling dbt and Spark jobs, sensor patterns, SLA monitoring
- Data Modeling refresher — Star schema, kimball methodology, dimensional modeling, snowflake vs star schema for warehouse performance