Table of Contents

Why Data Transform?

Every data pipeline has three phases: get the data, move it somewhere, and make it useful. The third step — transformation — is where the business value lives. For decades this was done as ETL (Extract, Transform, Load): clean and shape data before writing it to the warehouse. Modern practice inverts this: ELT (Extract, Load, Transform) loads raw data first, then transforms it inside the warehouse or compute layer where resources are elastic and SQL is native.

# Modern data stack, simplified:
#
#  Sources          Extract/Load         Transform          Serve
#  -------          ------------         ---------          -----
#  Postgres    -->  Fivetran / Airbyte   dbt (SQL)     -->  BI tools
#  S3 events   -->  Spark batch          Spark SQL     -->  ML features
#  Kafka       -->  Flink connector      Flink ops     -->  real-time APIs
#  pandas jobs -->  direct write         cuDF (GPU)    -->  notebooks
#
#  ETL: transform before load  (data center era)
#  ELT: transform after load   (cloud warehouse era)
DimensionETLELT
Where transform runsDedicated ETL serverInside warehouse / compute cluster
Raw data preserved?Often notYes — raw layer in warehouse
Iteration speedSlow (pipeline redeploy)Fast (SQL rerun)
ToolingInformatica, SSISdbt, Spark, Flink, cuDF
Best forLegacy systems, complianceCloud-native, agile analytics

Where each tool fits

ToolPrimary Use CaseLatencyScale
dbtSQL transforms inside the warehouse (Snowflake, BigQuery, Redshift, DuckDB)MinutesWarehouse-bound
SparkLarge-scale batch / micro-batch outside the warehouseSeconds–minutesPetabytes
FlinkTrue stream processing, stateful event-by-event computationMillisecondsPetabytes/day
cuDF / RAPIDSGPU-accelerated DataFrame processing, drop-in pandas replacementMillisecondsSingle-node GPU
The modern data stack insight
These tools are not competing — they are complementary. A production architecture commonly uses all four: dbt for warehouse models, Spark for large historical backfills, Flink for real-time feature computation, and cuDF to speed up ML preprocessing on GPUs.

dbt Fundamentals

dbt (data build tool) is a SQL-first transformation framework. The core insight: every dbt model is a SELECT statement. dbt handles the DDL (CREATE TABLE, CREATE VIEW) so you only write the logic. dbt compiles your SQL, resolves dependencies via a DAG, and runs models in the right order.

How dbt works

# You write this:
#   models/marts/orders.sql → SELECT ... FROM {{ ref('stg_orders') }}
#
# dbt compiles to:
#   CREATE TABLE analytics.orders AS SELECT ... FROM analytics.stg_orders
#
# dbt handles:
#   - Dependency resolution (DAG)
#   - Materialization (view vs table vs incremental)
#   - Schema management
#   - Documentation generation
#   - Testing

The ref() function — dbt's killer feature

The ref() function is how models reference each other. It resolves to the correct schema at compile time and declares the dependency so dbt builds the DAG automatically.

-- models/staging/stg_orders.sql
-- Raw → cleaned, renamed, typed

SELECT
    id                          AS order_id,
    customer_id,
    CAST(created_at AS TIMESTAMP) AS ordered_at,
    status,
    total_amount_cents / 100.0  AS total_usd,
    -- normalize status to lowercase
    LOWER(status)               AS order_status
FROM {{ source('raw', 'orders') }}
WHERE id IS NOT NULL
-- models/intermediate/int_orders_with_customers.sql
-- Join cleaned orders with cleaned customers

SELECT
    o.order_id,
    o.ordered_at,
    o.order_status,
    o.total_usd,
    c.customer_name,
    c.customer_email,
    c.country_code
FROM {{ ref('stg_orders') }} AS o
JOIN {{ ref('stg_customers') }} AS c ON o.customer_id = c.customer_id
-- models/marts/fct_orders.sql
-- Final fact table consumed by BI tools

SELECT
    order_id,
    ordered_at,
    DATE_TRUNC('month', ordered_at) AS order_month,
    customer_name,
    country_code,
    order_status,
    total_usd,
    -- derived columns
    CASE
        WHEN total_usd >= 500 THEN 'high'
        WHEN total_usd >= 100 THEN 'medium'
        ELSE 'low'
    END AS order_tier
FROM {{ ref('int_orders_with_customers') }}
WHERE order_status NOT IN ('cancelled', 'test')

Sources — declaring external data

# models/staging/sources.yml
version: 2

sources:
  - name: raw
    schema: raw_data
    description: Raw tables loaded by Fivetran
    tables:
      - name: orders
        description: Order events from the production database
        loaded_at_field: _fivetran_synced
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}
        columns:
          - name: id
            description: Primary key
            tests:
              - unique
              - not_null
      - name: customers
        description: Customer records

dbt Project Structure

my_dbt_project/
├── dbt_project.yml           # Project config: name, version, model paths, vars
├── profiles.yml              # Connection credentials (usually in ~/.dbt/)
├── packages.yml              # Third-party packages (dbt_utils, dbt_expectations)
├── models/
│   ├── staging/              # 1-to-1 with raw sources, minimal transforms
│   │   ├── sources.yml       # Source declarations + freshness tests
│   │   ├── stg_orders.sql
│   │   └── stg_customers.sql
│   ├── intermediate/         # Business logic, joins — not exposed to BI
│   │   └── int_orders_with_customers.sql
│   └── marts/                # Wide, denormalized, BI-ready
│       ├── core/
│       │   ├── fct_orders.sql
│       │   └── dim_customers.sql
│       └── marketing/
│           └── mrt_customer_ltv.sql
├── seeds/                    # Static CSV files loaded as tables
│   └── country_codes.csv
├── snapshots/                # SCD Type 2 history tracking
│   └── orders_snapshot.sql
├── macros/                   # Reusable Jinja macros
│   ├── cents_to_dollars.sql
│   └── date_spine.sql
├── tests/                    # Custom singular tests
│   └── assert_positive_revenue.sql
└── analyses/                 # Ad-hoc SQL (not run in production)
    └── cohort_exploration.sql
# dbt_project.yml
name: my_company
version: '1.0.0'
config-version: 2

profile: my_company          # matches profiles.yml

model-paths: ["models"]
seed-paths: ["seeds"]
snapshot-paths: ["snapshots"]
macro-paths: ["macros"]

# Default materialization per layer
models:
  my_company:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral   # no physical table; inlined as CTE
    marts:
      +materialized: table
      +schema: marts
      core:
        fct_orders:
          +materialized: incremental
          +unique_key: order_id
Naming convention matters
The staging → intermediate → marts layering is a convention, not a requirement, but it creates a clear contract: staging models are owned by the data engineering team (data contracts, freshness SLAs), marts are owned by the analytics team (never break downstream BI). Intermediate is the negotiation layer.

dbt Models & Materializations

dbt supports four materializations. Each represents a different trade-off between query cost, build time, and freshness.

MaterializationSQL CreatedRebuild CostQuery CostUse When
viewCREATE VIEWNear zeroHigh (runs every query)Staging, light transforms
tableCREATE TABLE AS SELECTFull rebuild every runLow (pre-materialized)Final marts, medium data
incrementalINSERT / MERGE into existing tableOnly new rowsLowLarge tables, event data
ephemeralNone (inlined as CTE)ZeroRuns in context of referencing modelIntermediate logic, DRY helpers

Incremental models — the production workhorse

-- models/marts/fct_page_views.sql
-- Incremental model: only processes new events each run
{{
  config(
    materialized='incremental',
    unique_key='page_view_id',
    incremental_strategy='merge',   -- or 'append', 'delete+insert'
    on_schema_change='sync_all_columns'
  )
}}

SELECT
    page_view_id,
    session_id,
    user_id,
    page_url,
    referrer_url,
    viewed_at,
    time_on_page_seconds,
    -- only compute expensive UDFs on new rows
    {{ classify_page_type('page_url') }} AS page_category
FROM {{ ref('stg_page_views') }}

{% if is_incremental() %}
  -- Only process rows newer than the latest we've already loaded.
  -- Use a small lookback buffer (1 hour) for late-arriving events.
  -- Snowflake/Redshift syntax (BigQuery: DATETIME_SUB, DuckDB: interval arithmetic):
  --   BigQuery:  DATETIME_SUB(MAX(viewed_at), INTERVAL 1 HOUR)
  --   DuckDB:    MAX(viewed_at) - INTERVAL '1 hour'
  WHERE viewed_at >= (
    SELECT DATEADD(hour, -1, MAX(viewed_at)) FROM {{ this }}
  )
{% endif %}
Common incremental pitfalls
  • Late-arriving data: Events arrive hours after the fact (mobile apps going offline). Always include a lookback buffer (1–24 hours) in your is_incremental() filter.
  • Schema changes: Adding a column to an incremental model without on_schema_change='sync_all_columns' causes silent data gaps — old rows lack the new column.
  • Unique key violations: Without incremental_strategy='merge' and a unique_key, re-processing overlapping windows creates duplicates.
  • Full refresh needed: Run dbt run --full-refresh after backfilling source data or changing business logic that affects historical rows.

Config blocks — SQL vs YAML

-- Option A: config block in SQL (per-model override)
{{ config(materialized='table', tags=['daily', 'finance']) }}

SELECT * FROM {{ ref('int_revenue') }}
# Option B: schema.yml (preferred for shared config)
models:
  - name: fct_orders
    config:
      materialized: table
      tags: [daily, finance]
    description: "Core orders fact table"

dbt Testing & Documentation

Schema tests (generic tests)

# models/marts/schema.yml
version: 2

models:
  - name: fct_orders
    description: "Order events with customer attributes, one row per order"
    columns:
      - name: order_id
        description: "Primary key — unique across all time"
        tests:
          - unique
          - not_null
      - name: order_status
        tests:
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: total_usd
        tests:
          - not_null
          # dbt_utils.accepted_range does not exist — use dbt_expectations or expression_is_true
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100000

Singular tests — custom SQL assertions

-- tests/assert_no_negative_revenue.sql
-- This test FAILS if any rows are returned.
-- dbt treats returned rows as test failures.

SELECT
    order_id,
    total_usd,
    ordered_at
FROM {{ ref('fct_orders') }}
WHERE total_usd < 0
  AND order_status = 'delivered'
-- tests/assert_orders_before_customers_deleted.sql
-- Referential integrity: no orders for deleted customers

SELECT
    o.order_id,
    o.customer_id
FROM {{ ref('fct_orders') }} AS o
LEFT JOIN {{ ref('dim_customers') }} AS c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL

Documentation and data lineage

# Generate docs and serve locally
dbt docs generate  # writes catalog.json + manifest.json to target/
dbt docs serve     # launches http://localhost:8080

# The docs site shows:
# - Full DAG with source → staging → intermediate → marts
# - Column-level descriptions
# - Test status
# - Row counts and last-built timestamps

Data contracts (dbt 1.5+)

# Enforce schema contracts on models consumed by other teams
models:
  - name: fct_orders
    config:
      contract:
        enforced: true    # dbt will fail if actual schema differs
    columns:
      - name: order_id
        data_type: varchar
        constraints:
          - type: not_null
          - type: primary_key
      - name: total_usd
        data_type: numeric
        constraints:
          - type: not_null

dbt Advanced

Jinja macros

Macros are reusable Jinja2 functions. They reduce repetition across models and are stored in the macros/ directory.

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- macros/safe_divide.sql
{% macro safe_divide(numerator, denominator) %}
    CASE
        WHEN {{ denominator }} = 0 OR {{ denominator }} IS NULL THEN NULL
        ELSE {{ numerator }}::float / {{ denominator }}
    END
{% endmacro %}

-- macros/date_trunc_to_week.sql  (cross-database compatible)
{% macro week_start(date_col) %}
    {% if target.type == 'snowflake' %}
        DATE_TRUNC('week', {{ date_col }})
    {% elif target.type == 'bigquery' %}
        DATE_TRUNC({{ date_col }}, WEEK(MONDAY))
    {% elif target.type == 'redshift' %}
        DATE_TRUNC('week', {{ date_col }})
    {% else %}
        DATE_TRUNC('week', {{ date_col }})
    {% endif %}
{% endmacro %}
-- Using macros in a model
SELECT
    order_id,
    {{ cents_to_dollars('total_amount_cents') }} AS total_usd,
    {{ safe_divide('revenue_cents', 'item_count') }} AS avg_item_revenue,
    {{ week_start('ordered_at') }} AS order_week
FROM {{ source('raw', 'orders') }}
When macros go too far
Macros are powerful but can obscure logic. A model full of macro calls is hard to debug — you need to dbt compile to see the generated SQL. Rule of thumb: if a macro is used in only one model, inline the logic instead. Reserve macros for cross-database compatibility shims and patterns repeated in 5+ models.

Snapshots — SCD Type 2

-- snapshots/customers_snapshot.sql
-- Tracks changes to the customers table over time.
-- Creates dbt_scd_id, dbt_updated_at, dbt_valid_from, dbt_valid_to columns.

{% snapshot customers_snapshot %}
{{
  config(
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='timestamp',         -- or 'check' for tables without updated_at
    updated_at='updated_at',
    invalidate_hard_deletes=True
  )
}}

SELECT
    customer_id,
    customer_name,
    email,
    plan_tier,
    country_code,
    updated_at
FROM {{ source('raw', 'customers') }}

{% endsnapshot %}
-- Query snapshot: what was each customer's plan on 2025-01-01?
SELECT
    customer_id,
    customer_name,
    plan_tier
FROM {{ ref('customers_snapshot') }}
WHERE dbt_valid_from <= '2025-01-01'
  AND (dbt_valid_to > '2025-01-01' OR dbt_valid_to IS NULL)

Useful packages

# packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: [">=1.0.0", "<2.0.0"]
  - package: calogica/dbt_expectations
    version: [">=0.9.0", "<1.0.0"]
  - package: dbt-labs/audit_helper
    version: [">=0.9.0"]
-- dbt_utils examples
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} AS sk

-- Modern dbt (1.1+): use {{ dbt.date_spine(...) }} instead of the dbt_utils version
{{ dbt_utils.date_spine(datepart='day', start_date="'2024-01-01'", end_date="current_date") }}

-- dbt_utils.pivot must appear inside a SELECT (it expands to a list of expressions)
SELECT
  {{ dbt_utils.pivot('device_type', ['mobile','desktop','tablet']) }}
FROM {{ ref('sessions') }}
# dbt_expectations examples (in schema.yml)
tests:
  - dbt_expectations.expect_column_values_to_be_between:
      min_value: 0
      max_value: 1000000
  - dbt_expectations.expect_table_row_count_to_be_between:
      min_value: 1000

dbt in Production

CI/CD with dbt build

# dbt build = compile + run + test + snapshot, in dependency order
dbt build

# Slim CI: only run models changed in this PR + their downstream dependents
# Requires a production manifest (manifest.json) from the last successful run
dbt build \
  --select state:modified+   \    # changed models and everything downstream
  --defer                    \    # use production results for unmodified upstream models
  --state path/to/prod-manifest/

# Target schema for CI to avoid polluting production
dbt build --target ci --vars '{"is_ci": true}'
# .github/workflows/dbt-ci.yml
name: dbt CI
on:
  pull_request:
    paths:
      - 'models/**'
      - 'tests/**'
      - 'macros/**'
      - 'dbt_project.yml'

jobs:
  dbt-slim-ci:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Download production manifest
        run: aws s3 cp s3://my-bucket/dbt-artifacts/manifest.json ./prod-manifest/manifest.json

      - name: Install dbt
        run: pip install dbt-snowflake

      - name: dbt build (slim CI)
        env:
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
        run: |
          dbt deps
          dbt build \
            --select state:modified+ \
            --defer \
            --state ./prod-manifest/ \
            --target ci

dbt Cloud vs dbt Core

Featuredbt Core (open source)dbt Cloud
Scheduled runsExternal scheduler (Airflow, Prefect)Built-in job scheduler
Slim CIManual setup (S3 artifact, GitHub Actions)Native CI/CD integration
IDEYour editor + CLIWeb-based SQL IDE
Docs hostingSelf-hostHosted (shareable link)
Semantic LayerCLI + MetricFlowFully integrated
OrchestrationAirflow, Prefect, DagsterBuilt-in
CostFree$100–$200+/dev/month

Freshness tests

# Check that sources aren't stale before running models
dbt source freshness

# In sources.yml (already shown above):
freshness:
  warn_after: {count: 12, period: hour}
  error_after: {count: 24, period: hour}

# In dbt_project.yml, run freshness as part of dbt build:
# dbt build --select source:raw+  # includes freshness checks

Spark for Data Transform

Cross-reference
This section focuses on transform patterns in Spark. For Spark internals (RDDs, shuffle, partitioning, execution model, performance tuning), see the Spark refresher.

Spark's DataFrame API lets you express transforms as a directed acyclic graph of operations that Spark's Catalyst optimizer compiles into efficient physical plans. The same logic can be expressed as DataFrame API calls or SQL — choose based on readability.

DataFrame API vs Spark SQL — side by side

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("transforms").getOrCreate()

orders = spark.table("raw.orders")
customers = spark.table("raw.customers")

# ── DataFrame API ──────────────────────────────────────────────────────────
result_df = (
    orders
    .filter(F.col("status") != "test")
    .join(customers, on="customer_id", how="inner")
    .withColumn("total_usd", F.col("total_cents") / 100.0)
    .withColumn("order_month", F.date_trunc("month", F.col("ordered_at")))
    .withColumn(
        "order_tier",
        F.when(F.col("total_usd") >= 500, "high")
         .when(F.col("total_usd") >= 100, "medium")
         .otherwise("low")
    )
    .groupBy("order_month", "country_code", "order_tier")
    .agg(
        F.count("order_id").alias("order_count"),
        F.sum("total_usd").alias("revenue_usd"),
        F.countDistinct("customer_id").alias("unique_customers")
    )
)

# ── Spark SQL (identical output) ───────────────────────────────────────────
orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")

result_sql = spark.sql("""
    SELECT
        DATE_TRUNC('month', o.ordered_at)   AS order_month,
        c.country_code,
        CASE
            WHEN o.total_cents / 100.0 >= 500 THEN 'high'
            WHEN o.total_cents / 100.0 >= 100 THEN 'medium'
            ELSE 'low'
        END                                  AS order_tier,
        COUNT(o.order_id)                    AS order_count,
        SUM(o.total_cents / 100.0)           AS revenue_usd,
        COUNT(DISTINCT o.customer_id)        AS unique_customers
    FROM orders o
    JOIN customers c USING (customer_id)
    WHERE o.status != 'test'
    GROUP BY 1, 2, 3
""")

Common transform patterns

Deduplication

from pyspark.sql.window import Window

# Keep the most recent record per entity
w = Window.partitionBy("customer_id").orderBy(F.desc("updated_at"))

deduped = (
    customers
    .withColumn("row_num", F.row_number().over(w))
    .filter(F.col("row_num") == 1)
    .drop("row_num")
)

SCD Type 2 merge pattern

from delta.tables import DeltaTable

# Merge incoming changes into a Delta Lake table (SCD Type 2 simplification)
delta_table = DeltaTable.forName(spark, "gold.customers")

delta_table.alias("target").merge(
    source=incoming_df.alias("source"),
    condition="target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
    "customer_name": "source.customer_name",
    "plan_tier": "source.plan_tier",
    "updated_at": "source.updated_at"
}).whenNotMatchedInsert(values={
    "customer_id": "source.customer_id",
    "customer_name": "source.customer_name",
    "plan_tier": "source.plan_tier",
    "created_at": "source.created_at",
    "updated_at": "source.updated_at"
}).execute()

Sessionization

# Group events into sessions (30-minute gap = new session)
SESSION_TIMEOUT_SECS = 1800

w_user = Window.partitionBy("user_id").orderBy("event_time")

sessionized = (
    events
    .withColumn("prev_event_time", F.lag("event_time").over(w_user))
    .withColumn(
        "is_new_session",
        F.when(
            F.col("prev_event_time").isNull() |
            (F.unix_timestamp("event_time") - F.unix_timestamp("prev_event_time") > SESSION_TIMEOUT_SECS),
            1
        ).otherwise(0)
    )
    .withColumn("session_id_raw", F.sum("is_new_session").over(w_user))
    .withColumn(
        "session_id",
        F.concat_ws("_", F.col("user_id"), F.col("session_id_raw").cast("string"))
    )
)

Incremental processing with foreachBatch

# Read from Kafka, transform, write to Delta Lake — micro-batch streaming
from pyspark.sql.types import StructType, StringType, LongType

schema = StructType().add("order_id", StringType()).add("total_cents", LongType())

stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "orders")
    .load()
)

def process_batch(batch_df, batch_id):
    parsed = batch_df.selectExpr("CAST(value AS STRING) as json") \
        .select(F.from_json("json", schema).alias("data")) \
        .select("data.*") \
        .withColumn("total_usd", F.col("total_cents") / 100.0)

    parsed.write.format("delta").mode("append").saveAsTable("silver.orders")

query = (
    stream.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", "/checkpoints/orders")
    .trigger(processingTime="30 seconds")
    .start()
)
query.awaitTermination()

Flink is a distributed dataflow graph executor. Everything reduces to:

Stream → Operator → Stream → Operator → Stream

# Like Unix pipes, but distributed and stateful.

The key distinction from Spark: Flink is stream-first. Batch is a special case of a bounded stream. Flink processes one event at a time with true per-event latency, not micro-batches.

Key abstractions — bottom up

AbstractionDescriptionAnalogy
RecordA single event (key + value + timestamp)A single log line
StreamUnbounded sequence of recordsA Unix pipe
OperatorTransform applied to a streamA grep or awk command
KeyedStreamStream partitioned by key (like GROUP BY)Multiple parallel pipes, one per key
StateMemory local to an operator, persisted and fault-tolerantA variable in an awk script that persists across lines

Core operators

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.*;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Modern KafkaSource requires fromSource() — not the legacy addSource() API
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("orders")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
DataStream<String> rawStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

DataStream<OrderEvent> orders = rawStream
    .map(json -> parseOrder(json))  // deserialize JSON → OrderEvent

    // map: 1-to-1 transform
    .map(order -> {
        order.totalUsd = order.totalCents / 100.0;
        return order;
    })

    // filter: 1-to-0 or 1-to-1 (keep or drop)
    .filter(order -> order.status != OrderStatus.TEST)

    // flatMap: 1-to-N (one input produces zero or more outputs)
    .flatMap((OrderEvent order, Collector<LineItem> out) -> {
        for (LineItem item : order.lineItems) {
            out.collect(item);
        }
    })

    // keyBy: partition by key — subsequent operators see only their key's events
    .keyBy(order -> order.customerId)

    // window: group events in time
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))

    // aggregate within window
    .aggregate(new OrderCountAggregator())

    // process: full control, access to state + timers
    .process(new FraudDetectionProcessor());

// Execute — Flink does nothing until execute() is called
env.execute("Order Processing Job");

Bounded vs unbounded streams

// Unbounded (streaming) — runs forever
// FlinkKafkaConsumer was the legacy API (removed in Flink 1.17).
// Use KafkaSource + fromSource() for all new code:
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("orders")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

// Bounded (batch mode) — finite input, runs to completion
// In Flink 1.12+, DataStream API works for both
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// env.readFile() was deprecated and removed. Use FileSource + fromSource():
FileSource<String> fileSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/orders/"))
    .build();
DataStream<String> bounded = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "File Source");

// STREAMING mode: event-by-event, true streaming semantics
// BATCH mode:     optimized for throughput, blocking operators allowed

Windows are how Flink groups an infinite stream into finite buckets for aggregation. Choosing the right window type determines the latency, memory cost, and business semantics of your computation.

Tumbling windows — fixed, non-overlapping

# Events split into non-overlapping fixed intervals
# Each event belongs to exactly one window

|--- hour 1 ---|--- hour 2 ---|--- hour 3 ---|
  e1 e2 e3        e4              e5 e6 e7
  → window 1      → window 2      → window 3
// Count orders per 1-hour tumbling window, per customer
orders
    .keyBy(order -> order.customerId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .aggregate(new OrderSumAggregator(), new WindowMetadataFunction())
    .print();

Sliding windows — fixed size, overlapping

# Window size = 1 hour, slide = 15 minutes
# Each event appears in multiple windows (4 windows for 1h/15m ratio)

time: --0:00--0:15--0:30--0:45--1:00--1:15--1:30--
[window 1: 0:00 – 1:00]
      [window 2: 0:15 – 1:15]
            [window 3: 0:30 – 1:30]
// Rolling 1-hour revenue, updated every 15 minutes
orders
    .keyBy(order -> order.country)
    .window(SlidingEventTimeWindows.of(
        Duration.ofHours(1),    // window size
        Duration.ofMinutes(15)  // slide interval
    ))
    .aggregate(new RevenueAggregator())
    .print();

Session windows — gap-based, dynamic size

# Window ends when there's a gap > 30 minutes with no events
# Window size varies per user session

user A: [click]---[click]--[click]              [click][click]
                    <30min gap                   <30min gap
        [--- session 1 ----------]   >30min gap  [-- session 2 --]
// User sessions with 30-minute inactivity gap
pageViews
    .keyBy(view -> view.userId)
    .window(EventTimeSessionWindows.withGap(Duration.ofMinutes(30)))
    .aggregate(new SessionAggregator())
    .print();

Global windows — custom trigger

// Global window: all events for a key in one window
// Requires a custom trigger to fire
orders
    .keyBy(order -> order.customerId)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(100))  // fire every 100 events
    .aggregate(new OrderBatchAggregator())
    .print();

Window summary

TypeOverlap?SizeUse Case
TumblingNoFixedHourly/daily reporting, billing periods
SlidingYesFixedRolling averages, moving metrics
SessionNoDynamic (gap-based)User sessions, burst detection
GlobalNoUnboundedCustom triggers, count-based batching

Keyed state — per-key storage

import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

// A stateful function that detects when a customer's 5-order total exceeds $1000
public class HighValueDetector extends KeyedProcessFunction<String, OrderEvent, Alert> {

    // State is automatically scoped to each key (customerId)
    private ValueState<Double> totalSpentState;
    private ValueState<Integer> orderCountState;
    private ListState<Long> recentOrderTimestamps;
    private MapState<String, Double> ordersByProduct;

    @Override
    public void open(Configuration parameters) throws Exception {
        totalSpentState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("totalSpent", Double.class)
        );
        orderCountState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("orderCount", Integer.class)
        );
        recentOrderTimestamps = getRuntimeContext().getListState(
            new ListStateDescriptor<>("timestamps", Long.class)
        );
        ordersByProduct = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("byProduct", String.class, Double.class)
        );
    }

    @Override
    public void processElement(OrderEvent order, Context ctx, Collector<Alert> out) throws Exception {
        // Read current state (null if first event for this key)
        Double currentTotal = totalSpentState.value();
        if (currentTotal == null) currentTotal = 0.0;

        Integer currentCount = orderCountState.value();
        if (currentCount == null) currentCount = 0;

        // Update state
        double newTotal = currentTotal + order.totalUsd;
        int newCount = currentCount + 1;
        totalSpentState.update(newTotal);
        orderCountState.update(newCount);
        recentOrderTimestamps.add(order.timestampMs);
        ordersByProduct.put(order.productCategory, order.totalUsd);

        // Register a timer to clear state after 24 hours of inactivity
        ctx.timerService().registerEventTimeTimer(order.timestampMs + 86_400_000L);

        // Emit alert if threshold crossed
        if (newTotal >= 1000.0 && newCount >= 5) {
            out.collect(new Alert(order.customerId, newTotal, newCount));
            totalSpentState.clear();
            orderCountState.clear();
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
        // Clear state for idle customers
        totalSpentState.clear();
        orderCountState.clear();
        recentOrderTimestamps.clear();
        ordersByProduct.clear();
    }
}

State backends

BackendStorageState SizeRead LatencyUse When
HashMapStateBackendJVM heap< 1 GB per TMMicrosecondsSmall state, low-latency jobs
EmbeddedRocksDBStateBackendRocksDB on local diskHundreds of GBMilliseconds (SSD)Large state (sessions, joins)
// Configure RocksDB state backend with incremental checkpoints
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental

// enableCheckpointing() sets the interval AND the mode — setCheckpointInterval()
// does not exist on CheckpointConfig; the interval belongs on the env call.
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);  // every 60s
CheckpointConfig cpConfig = env.getCheckpointConfig();
cpConfig.setCheckpointTimeout(300_000);                // fail if > 5min
cpConfig.setMinPauseBetweenCheckpoints(5000);
cpConfig.setMaxConcurrentCheckpoints(1);
cpConfig.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

How checkpointing works — Chandy-Lamport algorithm

# Checkpoint barriers flow through the stream like special records.
# When an operator receives a barrier, it:
# 1. Pauses processing
# 2. Snapshots its state to durable storage (S3, HDFS)
# 3. Forwards the barrier downstream
# 4. Resumes processing
#
# When all operators have snapshotted, the checkpoint is complete.
# On failure, Flink resets to the last complete checkpoint.
#
# Source  → map → keyBy → window → sink
#   |          |       |        |      |
# barrier → barrier → barrier → barrier → ACK to JobManager
#
# Exactly-once = checkpoint + transactional sink (2PC)
# At-least-once = checkpoint without aligned barriers (faster, may replay)

Savepoints vs checkpoints

CheckpointSavepoint
PurposeAutomatic fault recoveryManual migration, upgrade, A/B test
TriggerAutomatic (periodic)Manual (flink savepoint <jobId>)
LifecycleDeleted on new checkpoint or job cancelPersists until manually deleted
Operator state alignmentNot guaranteed across versionsStable, supports code changes
                +------------------+
                |   JobManager     |
                |  (the brain)     |
                +--------+---------+
                         | assigns tasks
        +----------------+----------------+
        v                v                v
+---------------+ +---------------+ +---------------+
| TaskManager 1 | | TaskManager 2 | | TaskManager 3 |
| (worker)      | | (worker)      | | (worker)      |
| Slot 1: map   | | Slot 1: map   | | Slot 1: window|
| Slot 2: filter| | Slot 2: sink  | | Slot 2: agg   |
+---------------+ +---------------+ +---------------+

JobManager responsibilities

TaskManager responsibilities

Graph compilation

# User code → execution plan in 3 stages:
#
# 1. StreamGraph: logical operator DAG (from user code)
#    Source → Map → Filter → KeyBy → Window → Aggregate → Sink
#
# 2. JobGraph: physical plan with operator chaining applied
#    Operators that can run in the same thread are chained together.
#    Source+Map+Filter are often chained into one "operator chain".
#    Reduces serialization overhead between operators.
#
# 3. ExecutionGraph: parallel execution plan
#    Each operator is expanded to parallelism P copies.
#    ExecutionVertex = one parallel instance of one operator.
#    Each ExecutionVertex is assigned to a TaskManager slot.

# Operator chaining rules:
# - Same parallelism
# - No shuffle between them (not keyBy, not rebalance)
# - Both are chainable (map, filter, flatMap — yes; window — no)

# Disable chaining for debugging:
env.disableOperatorChaining();

Backpressure occurs when a downstream operator is slower than upstream. Flink uses credit-based flow control: a downstream TaskManager grants credits to upstream, allowing upstream to send only as many network buffers as the downstream has capacity for. No data is dropped — the pipeline slows end-to-end.

Credit-based flow control

# Normal flow (downstream keeping up):
#
#  Source ──[10 msg/s]──► Map ──[10 msg/s]──► Window ──[10 msg/s]──► Sink
#  Credits: Map grants 10 credits to Source → Source sends freely

# Backpressure (sink is slow):
#
#  Source ──[10 msg/s]──► Map ──[10 msg/s]──► Window ──[2 msg/s]──► Sink (slow!)
#                                                        ↑
#                                              Window buffers fill up
#                                              Map buffers fill up
#                                              Source slows to 2 msg/s
#
# Result: no data loss, but end-to-end latency increases

Diagnosing backpressure via Flink UI

# Flink Web UI: localhost:8081
# Job → Subtasks → Backpressure tab
#
# Colors:
#   OK (green)     = <10% backpressure ratio
#   LOW (yellow)   = 10-50%
#   HIGH (red)     = >50%  ← investigate immediately
#
# REST API alternative:
curl http://localhost:8081/jobs/{jobId}/vertices/{vertexId}/backpressure

Common causes and fixes

CauseSymptomFix
Slow sink (DB write)Sink vertex is HIGHBatch writes, async sink, increase sink parallelism
Expensive computationCPU-bound operator is HIGHIncrease parallelism, optimize UDF, offload to external service
Key skewOne subtask is HIGH, others are OKAdd salt to key, redistribute with rebalance(), custom partitioner
GC pressureIntermittent spikes, HIGH during GCSwitch to RocksDB backend, tune GC, reduce object creation in hot path
Large state serializationCheckpoint coordinator shows slow snapshotsUse incremental RocksDB checkpoints, reduce state TTL

Flink SQL lets you process streams declaratively. The key abstraction: a dynamic table — a table that changes over time as new events arrive. A SQL query over a dynamic table produces another dynamic table.

Continuous queries on streams

import org.apache.flink.table.api.*;
// Note: KafkaSource (DataStream connector) is not used with the Table API.
// Kafka tables are declared via SQL DDL (CREATE TABLE ... WITH ('connector'='kafka',...))
// not via Java connector objects — no KafkaSource import needed here.

TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

// Declare a Kafka source as a dynamic table
tEnv.executeSql("""
    CREATE TABLE orders (
        order_id    STRING,
        customer_id STRING,
        product_id  STRING,
        amount      DECIMAL(10, 2),
        order_time  TIMESTAMP(3),
        WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""");

// Tumbling window aggregation — SQL GROUP BY TUMBLE()
tEnv.executeSql("""
    SELECT
        customer_id,
        TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start,
        COUNT(*)                                     AS order_count,
        SUM(amount)                                  AS total_revenue
    FROM orders
    GROUP BY customer_id, TUMBLE(order_time, INTERVAL '1' HOUR)
""").print();

Temporal joins — join stream with versioned table

-- Join orders with the exchange rate that was valid at order time
-- exchange_rates is a versioned table (tracks historical rates)
SELECT
    o.order_id,
    o.amount,
    o.currency,
    o.amount * r.rate_to_usd AS amount_usd,
    o.order_time
FROM orders AS o
JOIN exchange_rates FOR SYSTEM_TIME AS OF o.order_time AS r
    ON o.currency = r.currency_code

Pattern detection with MATCH_RECOGNIZE

-- Detect failed login followed by successful login within 5 minutes (brute-force pattern)
SELECT *
FROM login_events
MATCH_RECOGNIZE (
    PARTITION BY user_id
    ORDER BY event_time
    MEASURES
        FIRST(FAILED.event_time)    AS first_failure_time,
        LAST(FAILED.event_time)     AS last_failure_time,
        SUCCESS.event_time          AS success_time,
        COUNT(FAILED.event_id)      AS failure_count
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN (FAILED{3,} SUCCESS)   -- 3+ failures followed by success
    WITHIN INTERVAL '5' MINUTE
    DEFINE
        FAILED  AS event_type = 'LOGIN_FAILED',
        SUCCESS AS event_type = 'LOGIN_SUCCESS'
)

Catalog integration

// Register Hive Metastore catalog — use existing Hive tables in Flink SQL
HiveCatalog hiveCatalog = new HiveCatalog(
    "hive_catalog",
    "default",          // default database
    "/etc/hive/conf"    // hive-site.xml location
);
tEnv.registerCatalog("hive_catalog", hiveCatalog);
tEnv.useCatalog("hive_catalog");

// Now you can JOIN Kafka stream with Hive tables
tEnv.executeSql("""
    SELECT o.order_id, p.product_name, o.amount
    FROM kafka_catalog.default.orders AS o
    JOIN hive_catalog.default.products AS p ON o.product_id = p.product_id
""");

GPU-Accelerated Transform: NVIDIA cuDF & RAPIDS New Frontier

cuDF is NVIDIA's GPU-accelerated DataFrame library built on CUDA and Apache Arrow. It implements the pandas API, so existing pandas code can leverage 10–150x speedups with minimal or zero code changes. For a deeper dive into GPU hardware, the CUDA programming model, and NVIDIA's full stack, see the Accelerated Computing refresher.

cudf.pandas — the zero-code-change breakthrough

The most important feature: cudf.pandas acts as a transparent proxy. Import it once, and your existing pandas code runs on GPU automatically. Operations not supported by cuDF silently fall back to CPU pandas.

##############################################################
# Option A: Jupyter / IPython magic (zero code change)
##############################################################
%load_ext cudf.pandas
import pandas as pd           # This is now GPU-accelerated!

df = pd.read_parquet("orders.parquet")   # GPU read
result = (
    df[df["status"] == "delivered"]
    .groupby("customer_id")["total_usd"]
    .agg(["sum", "count", "mean"])
    .sort_values("sum", ascending=False)
    .head(100)
)
# No other changes needed. cuDF handles it transparently.


##############################################################
# Option B: Script / production code (one import line)
##############################################################
import cudf.pandas
cudf.pandas.install()  # patches pandas in-process — not a pip install
import pandas as pd           # Now GPU-accelerated

# Same exact pandas code as above — runs on GPU
df = pd.read_parquet("orders_large.parquet")
result = df.groupby(["country", "month"])["revenue"].sum()


##############################################################
# Option C: Explicit cuDF API (maximum control)
##############################################################
import cudf

df = cudf.read_parquet("orders.parquet")         # GPU memory
filtered = df[df["total_usd"] > 100]
joined = filtered.merge(customers, on="customer_id", how="left")
result = (
    joined
    .groupby(["country_code", "order_tier"])
    .agg({"total_usd": ["sum", "mean", "count"]})
)
# Convert back to pandas for non-GPU-aware downstream
pandas_result = result.to_pandas()

Performance benchmarks

Workloadpandas (CPU)cuDF (GPU)Speedup
GroupBy + agg on 1GB CSV45s0.8s~56x
Merge/join on 2GB parquet120s2.1s~57x
Rolling window (1M rows)8s0.12s~67x
String operations (regex)30s1.8s~17x
Spark RAPIDS vs Spark CPUSpark baseline+5x faster5x, 10x TCO

The RAPIDS ecosystem

LibraryReplacesReported SpeedupUse Case
cuDFpandas10–150xDataFrames, ETL preprocessing
cuMLscikit-learn27xML training and inference
cuGraphNetworkX487xGraph analytics, fraud networks
cuVSFAISS (CPU)10–100xVector search, ANN index
Spark RAPIDS pluginSpark CPU5xAccelerate existing Spark jobs

Spark RAPIDS — accelerating existing Spark jobs

# Add the RAPIDS plugin to your Spark cluster — no code changes needed
spark-submit \
  --jars rapids-4-spark_2.12-24.10.0.jar \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.rapids.sql.enabled=true \
  --conf spark.rapids.memory.pinnedPool.size=4g \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.5 \
  my_existing_spark_job.py
# Catalyst optimizer routes GPU-supported operations to cuDF
# CPU fallback for unsupported ops (no code change needed)

Same transform: pandas → cudf.pandas → Spark RAPIDS

##############################################################
# pandas (CPU baseline)
##############################################################
import pandas as pd

df = pd.read_parquet("events.parquet")      # 5 GB file
result = (
    df[df["event_type"] == "purchase"]
    .assign(revenue=df["price"] * df["quantity"])
    .groupby(["user_id", "product_category"])
    .agg(total_revenue=("revenue", "sum"), order_count=("order_id", "count"))
    .reset_index()
    .sort_values("total_revenue", ascending=False)
)
result.to_parquet("output.parquet")
# Runtime on A100: ~180 seconds


##############################################################
# cudf.pandas (GPU, zero code change)
##############################################################
import cudf.pandas; cudf.pandas.install()
import pandas as pd                         # same import!

df = pd.read_parquet("events.parquet")      # GPU read
result = (
    df[df["event_type"] == "purchase"]
    .assign(revenue=df["price"] * df["quantity"])
    .groupby(["user_id", "product_category"])
    .agg(total_revenue=("revenue", "sum"), order_count=("order_id", "count"))
    .reset_index()
    .sort_values("total_revenue", ascending=False)
)
result.to_parquet("output.parquet")
# Runtime on A100: ~2.1 seconds (~86x speedup)


##############################################################
# Spark RAPIDS (distributed GPU, same Spark code)
##############################################################
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.rapids.sql.enabled", "true") \
    .getOrCreate()

df = spark.read.parquet("s3://bucket/events/")   # multi-node GPU read
result = (
    df.filter(F.col("event_type") == "purchase")
    .withColumn("revenue", F.col("price") * F.col("quantity"))
    .groupBy("user_id", "product_category")
    .agg(
        F.sum("revenue").alias("total_revenue"),
        F.count("order_id").alias("order_count")
    )
    .orderBy(F.desc("total_revenue"))
)
result.write.parquet("s3://bucket/output/")
# Runtime: ~8 seconds on 4 A100 nodes (22x vs CPU Spark)

Polars GPU engine

# Polars 1.0+ supports GPU execution via cuDF engine.
# cuDF must be pre-installed from NVIDIA's package index BEFORE polars[gpu] will
# work at runtime — pip install polars[gpu] alone is not sufficient:
#   pip install --extra-index-url https://pypi.nvidia.com cudf-cu12
#   pip install polars[gpu]

import polars as pl

df = pl.scan_parquet("events.parquet")   # lazy scan
result = (
    df.filter(pl.col("event_type") == "purchase")
    .with_columns((pl.col("price") * pl.col("quantity")).alias("revenue"))
    .group_by(["user_id", "product_category"])
    .agg([
        pl.sum("revenue").alias("total_revenue"),
        pl.count("order_id").alias("order_count")
    ])
    .sort("total_revenue", descending=True)
    .collect(engine="gpu")   # <-- one flag to enable GPU
)

When NOT to use cuDF

GPU acceleration has real limits
  • Data smaller than ~100 MB: GPU transfer overhead dominates. pandas on CPU is faster.
  • Streaming / event-by-event: cuDF is batch-oriented. Use Flink for true streaming.
  • Wide DataFrames with few rows: GPU parallelism needs depth (many rows), not width.
  • Niche pandas features: cuDF covers ~93% of the pandas API. Operations like df.pivot_table(observed=False) or certain MultiIndex operations fall back to CPU — check compatibility before committing to GPU.
  • GPU memory: An A100 has 40–80 GB. A dataset larger than GPU VRAM requires chunking or Dask-cuDF (distributed GPU).

Real-world production wins

Stream Processing Patterns

CDC with Debezium

Change Data Capture (CDC) captures row-level changes from a database transaction log and publishes them as events to Kafka. Flink or Spark can consume these events for real-time materialized views.

// Debezium event format (Kafka message value)
{
  "before": { "order_id": "abc123", "status": "pending", "total_cents": 5000 },
  "after":  { "order_id": "abc123", "status": "shipped", "total_cents": 5000 },
  "op": "u",   // c=create, u=update, d=delete, r=read(snapshot)
  "ts_ms": 1710000000000,
  "source": { "db": "orders_db", "table": "orders" }
}
// Flink: consume Debezium CDC stream from Kafka
tEnv.executeSql("""
    CREATE TABLE orders_cdc (
        order_id    STRING,
        customer_id STRING,
        status      STRING,
        total_cents BIGINT,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'debezium.orders_db.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'debezium-json'
    )
""");

// Flink treats the CDC stream as a dynamic table with upserts
// This query maintains a real-time materialized view of active orders
tEnv.executeSql("""
    CREATE TABLE active_orders_view WITH ('connector' = 'jdbc', ...)
    AS SELECT
        customer_id,
        COUNT(*) AS active_order_count,
        SUM(total_cents) / 100.0 AS total_active_usd
    FROM orders_cdc
    WHERE status NOT IN ('delivered', 'cancelled')
    GROUP BY customer_id
""");

Exactly-once end-to-end

// Flink + Kafka exactly-once:
// - Source: Kafka consumer with committed offsets
// - Processing: Flink checkpointing (Chandy-Lamport)
// - Sink: Kafka transactional producer (2-phase commit)

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output-topic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)  // 2PC
    .setTransactionalIdPrefix("flink-exactly-once")
    .build();

stream.sinkTo(sink);

Late data handling and watermarks

// Watermarks tell Flink "events older than this timestamp are now late"
// Flink waits for the watermark before closing a window

// Strategy 1: Bounded out-of-orderness (most common)
// Allow events up to 10 seconds late
WatermarkStrategy<OrderEvent> strategy = WatermarkStrategy
    .<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, ts) -> event.timestampMs);

// Strategy 2: Custom (inspect each event to decide watermark)
WatermarkStrategy<OrderEvent> custom = WatermarkStrategy
    .forGenerator(ctx -> new WatermarkGenerator<OrderEvent>() {
        private long maxTimestamp = Long.MIN_VALUE;

        @Override
        public void onEvent(OrderEvent event, long ts, WatermarkOutput output) {
            maxTimestamp = Math.max(maxTimestamp, event.timestampMs);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTimestamp - 10_000)); // 10s lag
        }
    });

// Handle truly late events after window closes
stream
    .keyBy(e -> e.customerId)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
    .allowedLateness(Duration.ofSeconds(30))    // refire window for late arrivals
    .sideOutputLateData(lateTag)                // capture stragglers
    .aggregate(new SumAggregator());
Cross-reference
For Kafka-specific stream processing patterns (consumer groups, partitions, Kafka Streams DSL, exactly-once producer config), see the Kafka refresher.

The Big Comparison — dbt vs Spark vs Flink vs cuDF

Dimension dbt Spark Flink cuDF / RAPIDS
Latency Minutes (scheduled SQL) Seconds–minutes (micro-batch) Milliseconds (true streaming) Milliseconds (single-node GPU)
Primary language SQL + Jinja Python, Scala, Java, R Java, Scala, Python, SQL Python (pandas API)
Scale Warehouse-bound Petabytes (distributed) Petabytes/day (distributed) Single-node GPU (Dask-cuDF for multi-node)
State management None (stateless SQL) Limited (foreachBatch) First-class (ValueState, MapState, timers) None (batch)
SQL support Native (the whole interface) SparkSQL, Catalyst optimizer Flink SQL, Table API None native (use pandas API)
GPU support No Via RAPIDS plugin No (in development) Native — built for GPU
Exactly-once Not applicable (idempotent SQL) Structured Streaming + Delta Native (Chandy-Lamport + 2PC) Not applicable (batch)
Learning curve Low (SQL + YAML) Medium High (stateful streaming, watermarks) Very low (it's pandas)
Ecosystem Snowflake, BigQuery, Redshift, DuckDB Hadoop, Delta, Iceberg, S3 Kafka, Iceberg, JDBC, Elasticsearch RAPIDS (cuML, cuGraph), Spark RAPIDS
Best for Warehouse modeling, analytics engineering Large-scale batch ETL, ML feature engineering Real-time event processing, fraud detection, CDC GPU-accelerated pandas workloads, ML preprocessing

Decision flowchart

What are you building?
│
├── Transforming data inside a warehouse (Snowflake, BigQuery, Redshift)?
│   └── YES → dbt
│       ├── Simple SQL models → view materialization
│       ├── Large history tables → incremental materialization
│       └── SCD Type 2 → dbt snapshots
│
├── Processing data outside the warehouse?
│   │
│   ├── Data size < 10 GB and GPU available?
│   │   └── YES → cuDF / cudf.pandas (zero code change from pandas)
│   │
│   ├── Batch processing (> 10 GB, no strict latency SLA)?
│   │   └── YES → Spark
│   │       └── GPU available? → Add Spark RAPIDS for 5x speedup
│   │
│   ├── Real-time / streaming (per-event latency matters)?
│   │   └── YES → Flink
│   │       ├── Simple aggregations → Flink SQL
│   │       ├── Complex patterns / ML → DataStream API
│   │       └── CDC source → Flink + Debezium
│   │
│   └── Both batch AND stream over same data?
│       └── Flink (unified batch/stream) OR Spark Structured Streaming
│           → Use Delta Lake / Iceberg as the shared table format

Hybrid architectures in the wild

# Production architecture at a mid-size fintech:
#
# Raw Sources → Fivetran → Snowflake raw schema
#                     │
#                     ├── dbt → Snowflake marts → Tableau
#                     │
# Kafka (transactions) → Flink → Redis (fraud scores, <50ms)
#                             └── Iceberg (audit trail)
#
# Iceberg tables → Spark batch → ML feature store (Delta Lake)
#                            └── cuDF (GPU) on notebooks for exploration
#
# Rule: use the cheapest tool that satisfies latency + scale constraints

Convergence trends

The Future of Data Transform

GPU-first processing — NVIDIA's ecosystem strategy

At GTC 2026, NVIDIA announced aggressive expansion of RAPIDS across the data ecosystem. The strategy: make GPU acceleration the default at every layer of the modern data stack.

SQL everywhere — the lingua franca

SQL is winning. Every tool has invested heavily in SQL interfaces:

The implication: SQL skills transfer across tools. The differentiation is in execution engine (GPU vs CPU vs distributed), latency semantics (batch vs streaming), and cost model.

Batch-stream unification

The historical gap between batch and streaming systems is closing:

Lakehouse effect — one table format for batch and stream

Apache Iceberg and Delta Lake are enabling the same physical data to serve both batch analytics and streaming queries:

# Write from Flink (streaming) and read from Spark (batch) — same Iceberg table
# Flink write:
tEnv.executeSql("""
    CREATE TABLE orders_iceberg (order_id STRING, total DOUBLE, ts TIMESTAMP)
    WITH ('connector'='iceberg', 'catalog-name'='hive', 'database-name'='gold')
""")
# Spark read (batch):
spark.read.format("iceberg").load("hive.gold.orders_iceberg")
Cross-reference
For Iceberg and Delta Lake internals — file layout, snapshot isolation, schema evolution, time travel — see the Lakehouse refresher.

Cost/performance frontier

ApproachHardware CostThroughputLatencySweet Spot
DuckDB / Polars (CPU in-process)Low (single machine)High for single-nodeSeconds< 50 GB, ad-hoc analytics
cuDF (GPU single-node)Medium (GPU server)Very highMilliseconds10 GB – 1 TB, ML pipelines
Spark (distributed CPU)High (cluster)Highest (scale-out)Seconds–minutes> 1 TB batch
Spark RAPIDS (distributed GPU)Very high (GPU cluster)Highest + GPU accelerationSeconds> 1 TB, time-sensitive batch
Flink (distributed streaming)High (always-on cluster)High (continuous)MillisecondsReal-time requirements

Production Patterns

Monitoring by tool

dbt monitoring

# dbt freshness — are sources up to date?
dbt source freshness --output json | jq '.sources[] | select(.status != "pass")'

# dbt test on a schedule (Airflow example)
dbt_test = BashOperator(
    task_id='dbt_test',
    bash_command='dbt test --models marts.fct_orders --store-failures',
)

# Elementary — open-source dbt monitoring package
# Generates anomaly detection tests (row count, null rate, distribution)
# pip install elementary-data
elementary send-report --slack-token $SLACK_TOKEN --slack-channel "#data-alerts"

Flink monitoring

# Flink REST API metrics
curl http://localhost:8081/jobs/{jobId}/metrics?get=numRecordsInPerSecond,numRecordsOutPerSecond

# Key metrics to alert on:
# - numRecordsInPerSecond: source throughput
# - isBackPressured: backpressure indicator per vertex
# - lastCheckpointDuration: checkpoint time (should be << checkpoint interval)
# - numberOfFailedCheckpoints: should be 0
# - taskSlotsAvailable: cluster headroom

# Prometheus + Grafana (recommended setup)
# Add to flink-conf.yaml:
# metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
# metrics.reporter.prom.port: 9249

Spark monitoring

# Spark History Server: http://spark-master:18080
# Key metrics per job:
# - Stage duration vs shuffle read/write
# - Task skew: compare task durations in a stage
# - GC time: should be <10% of task time
# - Spill to disk: indicates insufficient memory

# Programmatic (for alerting)
from pyspark import SparkContext  # SparkContext lives in pyspark, not pyspark.sql
sc = SparkContext.getOrCreate()
# Access StatusTracker for stage/job completion rates

Scaling patterns

ToolScale UpScale OutKey Bottlenecks
dbtWarehouse compute credits (Snowflake XL warehouse)Parallelism in dbt Cloud, slim CI to reduce scopeShuffle-heavy SQL, very wide joins
SparkLarger executor memory/coresAdd executors, optimize partitioningShuffle (reduce with broadcast joins, bucketing)
FlinkIncrease task slot parallelismAdd TaskManagers, rescale checkpointed jobState backend I/O, key skew, slow sinks
cuDFLarger GPU (A100 80GB)Dask-cuDF (multi-GPU), multi-nodeGPU VRAM — chunk data or use Dask

Failure handling and recovery

# dbt: retry failed models
# dbt_project.yml
models:
  my_company:
    +on-run-end: "{{ store_results() }}"

# In Airflow DAG: retry failed dbt models only
dbt_run_failed = BashOperator(
    task_id='dbt_run_retry',
    bash_command='dbt run --models result:error+',  # re-run errored + downstream
    retries=2,
    retry_delay=timedelta(minutes=5),
)
// Flink: configure restart strategy
env.setRestartStrategy(
    RestartStrategies.fixedDelayRestart(
        3,                          // max 3 restarts
        Time.of(10, TimeUnit.SECONDS) // wait 10s between attempts
    )
);

// Or exponential backoff
env.setRestartStrategy(
    RestartStrategies.exponentialDelayRestart(
        Time.seconds(1),   // initial delay
        Time.minutes(2),   // max delay
        2.0,               // multiplier
        Time.minutes(10),  // reset threshold (if job stable for 10 min)
        0.1                // jitter factor
    )
);

Data quality at scale

# Great Expectations — works with pandas, Spark, and SQL
import great_expectations as gx

context = gx.get_context()
ds = context.sources.add_spark("spark_source", spark=spark)
da = ds.add_spark_df_asset("orders_asset")

batch_request = da.build_batch_request(dataframe=orders_df)
validator = context.get_validator(batch_request=batch_request)

# Define expectations
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("total_usd", min_value=0, max_value=1e6)
validator.expect_column_values_to_be_in_set(
    "status", ["pending", "confirmed", "shipped", "delivered", "cancelled"]
)

# Run and capture results
results = validator.validate()
if not results.success:
    raise ValueError(f"Data quality check failed: {results.statistics}")

Cross-links to related refreshers