Table of Contents

1. Lakehouse Architecture

The lakehouse combines the low-cost, scalable storage of a data lake with the ACID transactions, schema enforcement, and BI performance of a data warehouse. The key insight: separate compute from storage, then add a metadata/transaction layer on top of cheap object storage (S3, GCS, ADLS).

Evolution: Lake → Warehouse → Lakehouse

CharacteristicData LakeData WarehouseLakehouse
StorageObject storage (S3/GCS)Proprietary (Redshift, BQ)Object storage (S3/GCS)
FormatRaw: CSV, JSON, ParquetProprietary columnarOpen: Parquet + table format
ACID transactionsNoYesYes (via table format)
Schema enforcementOptional / schema-on-readYesYes
Time travelNo (manual snapshots)Limited / vendor-specificYes (first-class)
Streaming + batchSeparate systemsBatch only (typically)Unified
Compute enginesSpark, Hive, PrestoSingle vendorSpark, Trino, Flink, DuckDB
CostLow storage, complex opsHigh (compute + storage)Low storage, pay-per-query
ML workloadsGood (raw data access)Poor (no large file export)Excellent (versioned datasets)
Vendor lock-inLowHighLow (open formats)

Architecture Diagram

┌──────────────────────────────────────────────────────────────────────┐
│                         LAKEHOUSE ARCHITECTURE                        │
├──────────────────────────────────────────────────────────────────────┤
│  INGESTION LAYER                                                      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌────────────┐ │
│  │  Kafka /    │  │  Batch ETL  │  │  CDC (Debez-│  │  API /     │ │
│  │  Kinesis    │  │  (Spark/    │  │  ium/Flink) │  │  Files     │ │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └─────┬──────┘ │
│         └────────────────┴─────────────────┴───────────────┘        │
│                                    │                                  │
├────────────────────────────────────┼──────────────────────────────── │
│  STORAGE LAYER (Object Storage)    │                                  │
│  ┌─────────────────────────────────▼──────────────────────────────┐ │
│  │  S3 / GCS / ADLS                                                │ │
│  │  ├── bronze/   (raw, append-only Parquet)                       │ │
│  │  ├── silver/   (cleaned, deduplicated, enriched)                │ │
│  │  └── gold/     (aggregated, business-ready)                     │ │
│  └────────────────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────────────┤
│  TABLE FORMAT LAYER (ACID + Metadata)                                 │
│  ┌────────────────┐  ┌──────────────────┐  ┌───────────────────────┐│
│  │  Delta Lake    │  │  Apache Iceberg  │  │  Apache Hudi          ││
│  │  (_delta_log/) │  │  (metadata tree) │  │  (timeline/.hoodie/)  ││
│  └────────────────┘  └──────────────────┘  └───────────────────────┘│
├──────────────────────────────────────────────────────────────────────┤
│  CATALOG LAYER (Metadata Discovery)                                   │
│  Unity Catalog │ AWS Glue │ Hive Metastore │ Nessie │ REST Catalog   │
├──────────────────────────────────────────────────────────────────────┤
│  COMPUTE LAYER (Multi-Engine Access)                                  │
│  ┌─────────┐ ┌──────────┐ ┌───────────┐ ┌──────────┐ ┌──────────┐  │
│  │  Spark  │ │  Trino   │ │   Flink   │ │ DuckDB   │ │ Athena   │  │
│  └─────────┘ └──────────┘ └───────────┘ └──────────┘ └──────────┘  │
├──────────────────────────────────────────────────────────────────────┤
│  CONSUMPTION LAYER                                                    │
│  BI Tools │ ML Training │ Feature Stores │ Real-time APIs            │
└──────────────────────────────────────────────────────────────────────┘

Key Properties of a Lakehouse

Why not just use Snowflake or BigQuery?
Managed warehouses are excellent but expensive at scale, tie you to one vendor's compute pricing, and don't give you direct file-level access for ML training workloads. A lakehouse lets you run Spark for large-scale ML, Trino for ad-hoc SQL, and DuckDB for local exploration — all on the same Parquet files, paying only S3 storage costs when idle.

2. Open Table Formats Overview

Plain Parquet files on object storage have no concept of transactions, versioning, or schema enforcement. Open table formats solve this by adding a metadata layer that tracks which files belong to a table, what their statistics are, and the history of operations performed.

What They Solve

Delta Lake vs Iceberg vs Hudi — Feature Matrix

Feature Delta Lake Apache Iceberg Apache Hudi
OriginDatabricks (2019)Netflix (2018, Apache 2020)Uber (2017, Apache 2019)
Metadata formatJSON/Parquet transaction logJSON + Avro manifest filesAvro timeline + index files
ACID transactionsYes (OCC + write serialization)Yes (snapshot isolation)Yes (OCC)
Time travelYes (version + timestamp)Yes (snapshot ID + timestamp)Yes (commit timeline)
Schema evolutionGood (add/rename/drop)Excellent (full evolution)Good
Partition evolutionLimited (requires rewrite)Excellent (hidden partitioning)Limited
Upserts / MERGEYes (MERGE INTO)Yes (MERGE INTO)Excellent (native upsert engine)
Streaming writesYes (Structured Streaming)Yes (Flink + Spark)Yes (DeltaStreamer)
Multi-engine readsGood (Spark, Trino, DuckDB)Excellent (universal)Good (Spark-primary)
CompactionOPTIMIZE commandrewrite_data_files procedureBuilt-in inline compaction
Z-ORDER / clusteringYes (ZORDER BY)Sort order + bin-packingClustering (Z-curve)
Row-level deletesYes (deletion vectors)Yes (position/equality deletes)Yes (MOR + index)
Primary ecosystemDatabricks, AWS EMRAWS (Glue/Athena), SnowflakeSpark streaming, CDC pipelines
Catalog integrationUnity, Glue, HiveREST, Nessie, Glue, HiveHive Metastore, Glue
Column stats / skippingYes (per-file min/max)Yes (per-file + per-partition)Yes (column stats index)
Bloom filtersYesYesYes
DML supportUPDATE, DELETE, MERGEUPDATE, DELETE, MERGEINSERT, UPSERT, DELETE
Best forDatabricks shops, batch DWHMulti-engine, long-term storageNear-real-time CDC, upserts
Quick decision heuristic
Delta Lake if you're already on Databricks or AWS EMR and need the simplest path. Iceberg if you need multi-engine access (Spark + Trino + Flink + Snowflake) or care deeply about partition evolution. Hudi if your primary use case is near-real-time CDC ingestion with sub-minute latency and you need built-in upsert indexing.

3. Delta Lake Deep Dive

Delta Lake stores data as Parquet files plus a transaction log (_delta_log/) in the same directory. Every write operation appends a JSON entry to the log. Every 10 commits, Delta checkpoints the log state into a Parquet file to bound replay time.

Transaction Log Structure

s3://my-bucket/tables/orders/
├── _delta_log/
│   ├── 00000000000000000000.json   # initial schema + first files
│   ├── 00000000000000000001.json   # add files from second write
│   ├── 00000000000000000002.json   # remove files (DELETE/UPDATE)
│   ├── ...
│   ├── 00000000000000000009.json
│   ├── 00000000000000000010.checkpoint.parquet  # checkpoint at v10
│   ├── 00000000000000000010.json
│   └── _last_checkpoint                         # points to latest checkpoint
├── part-00000-a1b2c3d4.snappy.parquet
├── part-00001-e5f6a7b8.snappy.parquet
└── ...

Log Entry Anatomy (JSON)

// A single commit JSON entry (00000000000000000001.json)
{
  "commitInfo": {
    "timestamp": 1707840000000,
    "operation": "WRITE",
    "operationParameters": { "mode": "Append", "partitionBy": "[\"date\"]" },
    "readVersion": 0,
    "isolationLevel": "Serializable"
  }
}
{
  "add": {
    "path": "date=2024-02-14/part-00000-abc123.snappy.parquet",
    "partitionValues": { "date": "2024-02-14" },
    "size": 1048576,
    "modificationTime": 1707840001000,
    "dataChange": true,
    "stats": "{\"numRecords\":50000,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":50000},\"nullCount\":{\"id\":0}}"
  }
}
{
  "remove": {
    "path": "date=2024-02-13/part-00000-old123.snappy.parquet",
    "deletionTimestamp": 1707840001000,
    "dataChange": true
  }
}

ACID Guarantees in Delta Lake

S3 eventual consistency gotcha (pre-2020)
Before December 2020, S3 had eventual consistency for new object listings. Delta Lake worked around this with a lock service (DynamoDB). Since S3 strong consistency was announced, DynamoDB locking is optional for most write patterns — but keep it enabled for concurrent multi-writer scenarios on older infrastructure.

PySpark + Delta Lake Examples

Setup

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("delta-demo")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

Creating and Writing a Delta Table

from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp

# Write as Delta (overwrite mode creates table if not exists)
(
    df.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("event_date")
    .save("s3://my-bucket/tables/events")
)

# Or using SQL DDL
spark.sql("""
    CREATE TABLE IF NOT EXISTS events (
        event_id    BIGINT NOT NULL,
        user_id     BIGINT NOT NULL,
        event_type  STRING NOT NULL,
        payload     STRING,
        event_date  DATE   NOT NULL
    )
    USING DELTA
    PARTITIONED BY (event_date)
    LOCATION 's3://my-bucket/tables/events'
    TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact'   = 'true'
    )
""")

Reading a Delta Table

# Current version (default)
df = spark.read.format("delta").load("s3://my-bucket/tables/events")

# Time travel by version
df_v5 = (
    spark.read
    .format("delta")
    .option("versionAsOf", 5)
    .load("s3://my-bucket/tables/events")
)

# Time travel by timestamp
df_yesterday = (
    spark.read
    .format("delta")
    .option("timestampAsOf", "2024-02-13 00:00:00")
    .load("s3://my-bucket/tables/events")
)

# Via catalog
df = spark.table("my_catalog.my_schema.events")

DML: UPDATE, DELETE, MERGE

from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "s3://my-bucket/tables/events")

# UPDATE
dt.update(
    condition="event_type = 'click'",
    set={"event_type": "'CLICK'"}
)

# DELETE
dt.delete(condition="event_date < '2023-01-01'")

# MERGE (upsert)
(
    dt.alias("target")
    .merge(
        source=updates_df.alias("source"),
        condition="target.event_id = source.event_id"
    )
    .whenMatchedUpdate(set={
        "event_type": "source.event_type",
        "payload":    "source.payload"
    })
    .whenNotMatchedInsertAll()
    .execute()
)

Delta SQL

-- Describe history
DESCRIBE HISTORY events;

-- Time travel query
SELECT * FROM events VERSION AS OF 5;
SELECT * FROM events TIMESTAMP AS OF '2024-02-13';

-- Optimize and Z-ORDER
OPTIMIZE events ZORDER BY (user_id, event_type);

-- Vacuum (remove files older than retention threshold)
VACUUM events RETAIN 168 HOURS;  -- 7 days

-- Table properties
ALTER TABLE events SET TBLPROPERTIES (
    'delta.logRetentionDuration' = 'interval 30 days',
    'delta.deletedFileRetentionDuration' = 'interval 7 days'
);

-- Restore to a previous version
RESTORE TABLE events TO VERSION AS OF 10;
RESTORE TABLE events TO TIMESTAMP AS OF '2024-02-01';

Delta Lake with Structured Streaming

# Write stream to Delta table
(
    streaming_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://my-bucket/checkpoints/events")
    .trigger(processingTime="1 minute")  # micro-batch every minute
    # .trigger(availableNow=True)        # one-shot batch mode
    .start("s3://my-bucket/tables/events")
)

# Read stream from Delta table (change data feed)
(
    spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .load("s3://my-bucket/tables/events")
    .writeStream
    .format("console")
    .start()
)
Enable Change Data Feed (CDF) for downstream propagation
ALTER TABLE events SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true'). CDF records row-level changes (insert/update_preimage/update_postimage/delete) that downstream tables can consume incrementally — ideal for building silver-layer CDC pipelines without full re-scans.

4. Apache Iceberg Deep Dive

Iceberg's distinguishing feature is its metadata tree: a catalog points to a metadata file, which points to a manifest list, which points to manifest files, which point to actual data files. This indirection enables atomic swaps, hidden partitioning, and full partition evolution without touching data files.

Metadata Tree Structure

CATALOG (e.g., Glue, Nessie, REST)
└── table pointer: s3://bucket/warehouse/db/orders/metadata/v3.metadata.json

METADATA FILE (v3.metadata.json)
├── table-uuid, format-version: 2
├── current-snapshot-id: 8342957621
├── partition-spec: [{ field: "order_date", transform: "day" }]
├── sort-order: [{ field: "customer_id", direction: "asc" }]
└── snapshots:
    ├── snapshot-id: 8342957620  (previous)
    └── snapshot-id: 8342957621  (current)
        └── manifest-list: s3://.../snap-8342957621-0.avro

MANIFEST LIST (snap-8342957621-0.avro)
├── manifest-file: s3://.../1234abcd-m0.avro  (added files)
└── manifest-file: s3://.../5678efgh-m1.avro  (existing files)

MANIFEST FILE (1234abcd-m0.avro)
├── data-file: s3://.../00000-5-abc.parquet
│   ├── partition: { order_date: "2024-02-14" }
│   ├── record_count: 75000
│   ├── file_size_in_bytes: 8388608
│   └── column_stats: [{ lower_bound, upper_bound, null_count }]
└── data-file: s3://.../00001-5-def.parquet
    └── ...
Why this tree structure matters
Atomic table updates are achieved by writing a new metadata file and atomically updating the catalog pointer. No locks on data files. Readers hold a reference to an immutable snapshot; writers create new snapshots. This is why Iceberg scales to tables with millions of files — the metadata tree means you never need to list all files to find what's current.

Snapshot Isolation

Hidden Partitioning

Iceberg decouples the physical partition layout from the logical query interface. You define partition transforms on columns, and Iceberg automatically prunes partitions without requiring users to filter on partition columns explicitly.

-- Define table with hidden partitioning
CREATE TABLE orders (
    order_id    BIGINT,
    customer_id BIGINT,
    order_ts    TIMESTAMP,
    total       DECIMAL(10,2),
    status      STRING
)
USING iceberg
PARTITIONED BY (days(order_ts), truncate(customer_id, 10))
-- days(order_ts): partition by day derived from timestamp
-- truncate(customer_id, 10): partition by customer_id / 10
-- Users query on order_ts directly; Iceberg prunes partitions automatically
;

-- Supported transforms:
-- years(ts), months(ts), days(ts), hours(ts)
-- bucket(N, col)   -- hash bucket
-- truncate(N, col) -- first N chars / integer truncation
-- identity(col)    -- classic partition column (not recommended for high cardinality)

PySpark + Iceberg Examples

Setup (Spark + Iceberg)

spark = (
    SparkSession.builder
    .appName("iceberg-demo")
    .config("spark.jars.packages",
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0")
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.my_catalog",
            "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.my_catalog.type", "rest")
    .config("spark.sql.catalog.my_catalog.uri", "http://nessie:19120/api/v1")
    # Or for Glue:
    # .config("spark.sql.catalog.my_catalog.type", "glue")
    # .config("spark.sql.catalog.my_catalog.warehouse", "s3://my-bucket/warehouse")
    .getOrCreate()
)

Creating and Writing

# Create table via SQL
spark.sql("""
    CREATE TABLE my_catalog.db.orders (
        order_id    BIGINT      NOT NULL,
        customer_id BIGINT      NOT NULL,
        order_ts    TIMESTAMP   NOT NULL,
        total       DECIMAL(12,2),
        status      STRING
    )
    USING iceberg
    PARTITIONED BY (days(order_ts))
    LOCATION 's3://my-bucket/warehouse/db/orders'
    TBLPROPERTIES (
        'write.parquet.compression-codec' = 'zstd',
        'write.metadata.compression-codec' = 'gzip',
        'write.target-file-size-bytes' = '134217728'  -- 128 MB
    )
""")

# Write via DataFrame API
(
    df.writeTo("my_catalog.db.orders")
    .append()
)

# Overwrite by partition (dynamic overwrite)
(
    df.writeTo("my_catalog.db.orders")
    .overwritePartitions()
)

Time Travel and Snapshot Operations

# Query by snapshot ID
spark.read.option("snapshot-id", "8342957621") \
    .table("my_catalog.db.orders")

# Query by timestamp
spark.read.option("as-of-timestamp", "1707840000000") \
    .table("my_catalog.db.orders")

# Via SQL
spark.sql("SELECT * FROM my_catalog.db.orders FOR SYSTEM_VERSION AS OF 8342957621")
spark.sql("SELECT * FROM my_catalog.db.orders FOR SYSTEM_TIME AS OF '2024-02-14 12:00:00'")

# List snapshots
spark.sql("SELECT * FROM my_catalog.db.orders.snapshots").show()

# List history
spark.sql("SELECT * FROM my_catalog.db.orders.history").show()

# List manifests
spark.sql("SELECT * FROM my_catalog.db.orders.manifests").show()

# List data files (with stats)
spark.sql("SELECT file_path, file_size_in_bytes, record_count FROM my_catalog.db.orders.files").show()

Iceberg Procedures

# Expire old snapshots
spark.sql("""
    CALL my_catalog.system.expire_snapshots(
        table => 'db.orders',
        older_than => TIMESTAMP '2024-01-01 00:00:00',
        retain_last => 5
    )
""")

# Compact small files (bin-pack)
spark.sql("""
    CALL my_catalog.system.rewrite_data_files(
        table => 'db.orders',
        strategy => 'binpack',
        options => map('target-file-size-bytes', '134217728',
                       'min-file-size-bytes',    '67108864',
                       'max-file-size-bytes',    '536870912')
    )
""")

# Rewrite with sort order (Z-order equivalent)
spark.sql("""
    CALL my_catalog.system.rewrite_data_files(
        table => 'db.orders',
        strategy => 'sort',
        sort_order => 'customer_id ASC NULLS LAST, order_ts ASC'
    )
""")

# Remove orphan files
spark.sql("""
    CALL my_catalog.system.remove_orphan_files(
        table => 'db.orders',
        older_than => TIMESTAMP '2024-02-01 00:00:00'
    )
""")

# Rollback to snapshot
spark.sql("""
    CALL my_catalog.system.rollback_to_snapshot(
        table => 'db.orders',
        snapshot_id => 8342957620
    )
""")

Iceberg MERGE INTO

-- Upsert with MERGE INTO (Iceberg)
MERGE INTO my_catalog.db.orders AS target
USING updates AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.status = 'CANCELLED' THEN DELETE
WHEN MATCHED THEN UPDATE SET
    target.status = source.status,
    target.total  = source.total
WHEN NOT MATCHED THEN INSERT *;

5. Apache Hudi Overview

Hudi (Hadoop Upserts Deletes and Incrementals) was built at Uber specifically for near-real-time upsert-heavy workloads: CDC ingestion from databases, event deduplication, and incremental processing pipelines. Its unique contributions are a built-in index for fast upsert lookups and two storage types that trade read vs write amplification.

Copy-on-Write vs Merge-on-Read

AspectCopy-on-Write (CoW)Merge-on-Read (MoR)
Write behaviorRewrite entire Parquet file on updateAppend delta (log) files; base file untouched
Read behaviorRead base Parquet only (fast)Merge base + log files on read (slower)
Write amplificationHigh (rewrites files)Low (appends only)
Read amplificationNonePresent until compaction
CompactionNot needed (implicit)Required (async or inline)
Storage overheadNoneExtra log files until compaction
Snapshot queryLatest base filesLatest base + log files
Read-optimized querySame as snapshotLatest compacted base only (stale by Δt)
Best forRead-heavy, low-update-frequency tablesHigh-frequency upsert workloads, CDC

Hudi Timeline

Hudi maintains a timeline of all table actions in a .hoodie/ directory. Each action has three states: requested (intent), inflight (in-progress), completed (committed).

s3://bucket/tables/orders/
├── .hoodie/
│   ├── 20240214120000000.commit           # completed commit (MoR CoW both)
│   ├── 20240214120000000.commit.requested # pre-commit request
│   ├── 20240214120500000.deltacommit      # MoR delta commit
│   ├── 20240214121000000.compaction.requested
│   ├── 20240214121000000.compaction.inflight
│   ├── 20240214121000000.commit           # compaction completed
│   └── hoodie.properties                 # table metadata
├── 20240214/                              # partition
│   ├── xxx-base.parquet                  # base file
│   └── xxx.log.1_1-0-1.parquet          # MoR delta log file
└── ...

Hudi PySpark Examples

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("hudi-demo")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    .getOrCreate()
)

# --- Upsert into a CoW table ---
hudi_options = {
    "hoodie.table.name":                   "orders",
    "hoodie.datasource.write.recordkey.field":    "order_id",
    "hoodie.datasource.write.partitionpath.field": "order_date",
    "hoodie.datasource.write.table.type":  "COPY_ON_WRITE",
    "hoodie.datasource.write.operation":   "upsert",
    "hoodie.datasource.write.precombine.field":  "updated_at",
    # For MoR, change table.type to MERGE_ON_READ
}

(
    updates_df.write
    .format("hudi")
    .options(**hudi_options)
    .mode("append")
    .save("s3://my-bucket/tables/orders")
)

# --- Incremental read (only changed records since a commit time) ---
(
    spark.read
    .format("hudi")
    .option("hoodie.datasource.query.type", "incremental")
    .option("hoodie.datasource.read.begin.instanttime", "20240214120000000")
    .load("s3://my-bucket/tables/orders")
    .show()
)

# --- Read-optimized query (MoR, base files only) ---
(
    spark.read
    .format("hudi")
    .option("hoodie.datasource.query.type", "read_optimized")
    .load("s3://my-bucket/tables/orders")
)

Hudi Compaction

# Inline compaction (every N delta commits)
hudi_options_mor = {
    "hoodie.table.type":                          "MERGE_ON_READ",
    "hoodie.compact.inline":                      "true",
    "hoodie.compact.inline.max.delta.commits":    "5",  # compact every 5 commits
    "hoodie.parquet.max.file.size":               str(128 * 1024 * 1024),  # 128 MB
}

# Async compaction via HoodieSparkCompactor (separate Spark job)
# spark-submit ... --class org.apache.hudi.utilities.HoodieCompactor \
#     --base-path s3://my-bucket/tables/orders \
#     --table-name orders \
#     --schema-file /path/to/schema.avsc
Hudi requires a precombine field
The precombine.field is used to resolve conflicts when multiple records share the same key within one batch — Hudi keeps the record with the highest precombine value. Typically updated_at (timestamp) or a monotonically increasing sequence number. Missing or wrong precombine causes silent data loss on upserts.

6. Schema Evolution

Schema evolution lets you modify a table's structure without rewriting existing data. The table format metadata tracks the current schema and its history, so readers can handle both old and new files correctly.

Operations: Safe vs Unsafe

OperationDelta LakeIcebergHudiRisk
Add columnYesYesYesSafe (nulls for old files)
Drop columnYes (explicit)YesLimitedData loss if column referenced
Rename columnYes (column mapping)Yes (by column ID)LimitedSafe in Iceberg (ID-based); risky in Delta without mapping enabled
Reorder columnsNoYesNoSafe in Iceberg
Widen type (int→long)YesYesLimitedSafe (no data loss)
Narrow type (long→int)NoNoNoUnsafe (potential overflow)
Change nullabilityPartialYesLimitedDepends on direction
Add struct fieldYesYesNoSafe
Change struct field typeLimitedYesNoCareful with widening only

Schema Evolution in Delta Lake

# Option 1: mergeSchema on write (auto-evolve from new DataFrame)
(
    new_df_with_extra_col.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")  # adds new columns from new_df
    .save("s3://my-bucket/tables/events")
)

# Option 2: ALTER TABLE statements
spark.sql("ALTER TABLE events ADD COLUMNS (session_id STRING AFTER user_id)")
spark.sql("ALTER TABLE events ADD COLUMNS (geo STRUCT<lat: DOUBLE, lon: DOUBLE>)")
spark.sql("ALTER TABLE events DROP COLUMN deprecated_field")

# Rename requires column mapping mode (safe rename)
spark.sql("""
    ALTER TABLE events
    SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name',
                       'delta.minReaderVersion'   = '2',
                       'delta.minWriterVersion'   = '5')
""")
spark.sql("ALTER TABLE events RENAME COLUMN event_ts TO occurred_at")

# Widen a type
spark.sql("ALTER TABLE events ALTER COLUMN user_id TYPE BIGINT")
# Schema enforcement: reject writes with incompatible schema
try:
    (
        bad_df.write
        .format("delta")
        .mode("append")
        .save("s3://my-bucket/tables/events")
        # Raises AnalysisException if schema doesn't match
        # and mergeSchema is not set
    )
except Exception as e:
    print(f"Schema mismatch rejected: {e}")

Schema Evolution in Iceberg

-- Iceberg tracks columns by ID (not name), enabling safe renames
ALTER TABLE my_catalog.db.orders ADD COLUMN region STRING AFTER status;
ALTER TABLE my_catalog.db.orders DROP COLUMN deprecated_region;
ALTER TABLE my_catalog.db.orders RENAME COLUMN total TO order_total;
ALTER TABLE my_catalog.db.orders ALTER COLUMN order_id TYPE BIGINT;

-- Reorder columns
ALTER TABLE my_catalog.db.orders ALTER COLUMN status FIRST;
ALTER TABLE my_catalog.db.orders ALTER COLUMN region AFTER status;

-- Add nested struct field
ALTER TABLE my_catalog.db.orders ADD COLUMN address STRUCT<street:STRING, zip:STRING, city:STRING>;
ALTER TABLE my_catalog.db.orders ADD COLUMN address.country STRING;
Iceberg's column ID system
Iceberg assigns each column a unique integer ID at creation time and never reuses IDs. This means renames and reorders are purely metadata operations — existing Parquet files are still readable because readers look up columns by ID, not by name or position. Delta Lake achieves similar safety only with column mapping mode enabled; without it, renames require rewriting files.

Schema Enforcement vs Evolution

# Delta Lake: set constraints for enforcement
spark.sql("""
    ALTER TABLE orders ADD CONSTRAINT order_total_positive
    CHECK (total > 0)
""")

# Iceberg: NOT NULL constraints
spark.sql("""
    CREATE TABLE orders (
        order_id BIGINT NOT NULL,
        total    DECIMAL(12,2) NOT NULL
    ) USING iceberg
""")
# ALTER TABLE orders ALTER COLUMN customer_id DROP NOT NULL

# Test schema enforcement
from pyspark.sql import functions as F
bad_records = (
    df.filter(F.col("total") <= 0)
    .count()
)
assert bad_records == 0, f"Found {bad_records} negative totals"

7. Partition Evolution

Partition strategy decisions made at table creation time often become wrong as data volumes and query patterns change. Table formats differ dramatically in how well they support changing partitioning without data rewrites.

Iceberg: Partition Evolution (Best-in-Class)

Iceberg stores partition metadata per manifest, so old files retain their original partition spec while new files use the updated spec. Queries work transparently across both partition specs — Iceberg figures out which files to read for each spec.

-- Initial table: partitioned monthly (too coarse at scale)
CREATE TABLE events (
    event_id BIGINT,
    user_id  BIGINT,
    event_ts TIMESTAMP,
    payload  STRING
)
USING iceberg
PARTITIONED BY (months(event_ts));

-- 2 years later, monthly partitions are too large.
-- Evolve to daily partitioning WITHOUT rewriting any data:
ALTER TABLE events REPLACE PARTITION FIELD months(event_ts) WITH days(event_ts);

-- Queries spanning the transition work correctly:
SELECT COUNT(*) FROM events
WHERE event_ts BETWEEN '2023-01-01' AND '2024-12-31';
-- Iceberg reads monthly-partitioned files for 2023, daily-partitioned for 2024

-- Add a second partition field (compound partitioning)
ALTER TABLE events ADD PARTITION FIELD bucket(16, user_id);

Delta Lake: Partition Strategy

Delta Lake does not support partition evolution — changing the partition columns requires rewriting the entire table. For this reason, choose Delta partition columns conservatively: prefer low-cardinality date columns and use Z-ORDER for high-cardinality filtering needs.

# Delta: changing partition requires a full rewrite
# This is the correct approach: CTAS with new partition
spark.sql("""
    CREATE OR REPLACE TABLE events_repartitioned
    USING DELTA
    PARTITIONED BY (event_date)  -- was PARTITIONED BY (event_month)
    AS SELECT * FROM events
""")

# Then swap (atomic rename is not native in Delta;
# typically done by renaming at catalog level)
spark.sql("ALTER TABLE events RENAME TO events_old")
spark.sql("ALTER TABLE events_repartitioned RENAME TO events")

# Mitigation: use ZORDER instead of fine-grained partitioning
spark.sql("""
    OPTIMIZE events ZORDER BY (user_id, event_type)
    -- Data clustering for high-cardinality cols without partition overhead
""")

Partition Sizing Guidance

ScenarioRecommendation
Partition too small (< 1 GB)Too many files; poor read performance. Coarsen partition grain (daily → weekly → monthly).
Partition too large (> 200 GB)Can't prune efficiently; individual partition queries are slow. Fine-grain or add compound partition.
Target partition size1–100 GB per partition (depends on engine; Spark works well at 10–50 GB per partition per core)
High-cardinality column filteringUse Z-ORDER (Delta) or sort order + bin-pack (Iceberg) instead of partition
Iceberg: hidden vs identityPrefer transforms (days/months/bucket) over identity — identity on a timestamp column creates a partition per unique value
Delta: partition column choiceDate/region work well; avoid user_id or event_type (too many partitions, small files per partition)
The small files problem amplified by partitioning
Streaming writes into a partitioned table create one or more small files per partition per micro-batch. With hourly partitioning and 1-minute micro-batches, you get 60 tiny files per partition per hour. Always run async compaction (OPTIMIZE or rewrite_data_files) on streaming tables, or use Delta's autoOptimize.autoCompact table property.

8. Time Travel & Versioning

Time travel lets you query historical data as it existed at any prior snapshot. Use cases: auditing, debugging pipelines, reproducing ML training sets, A/B test analysis, and rollback after bad writes.

Delta Lake Time Travel

-- List full transaction history
DESCRIBE HISTORY orders;
-- Returns: version, timestamp, userId, operation, operationParameters,
--          operationMetrics (numAdded, numRemoved, numOutputRows)

-- Query by version number
SELECT COUNT(*) FROM orders VERSION AS OF 42;

-- Query by timestamp
SELECT * FROM orders TIMESTAMP AS OF '2024-02-01 00:00:00';

-- In Python
df = spark.read.format("delta").option("versionAsOf", 42).load(path)
df = spark.read.format("delta").option("timestampAsOf", "2024-02-01").load(path)

-- Restore table to previous state (destructive: creates a new version)
RESTORE TABLE orders TO VERSION AS OF 42;
RESTORE TABLE orders TO TIMESTAMP AS OF '2024-02-01 00:00:00';

Iceberg Time Travel

-- Query specific snapshot
SELECT * FROM orders FOR SYSTEM_VERSION AS OF 8342957621;
SELECT * FROM orders FOR SYSTEM_TIME AS OF '2024-02-01 12:00:00';

-- List all snapshots
SELECT snapshot_id, committed_at, operation, summary
FROM my_catalog.db.orders.snapshots
ORDER BY committed_at DESC;

-- Compare two snapshots (use incremental scan)
SELECT * FROM my_catalog.db.orders.changes
WHERE snapshot_id_inclusive = 8342957620
  AND snapshot_id_exclusive = 8342957621;
-- Returns: _change_type (INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER)
# Rollback to a previous snapshot (metadata-only operation)
spark.sql("""
    CALL my_catalog.system.rollback_to_snapshot(
        table => 'db.orders',
        snapshot_id => 8342957620
    )
""")

# Rollback to timestamp
spark.sql("""
    CALL my_catalog.system.rollback_to_timestamp(
        table => 'db.orders',
        timestamp => TIMESTAMP '2024-02-01 00:00:00'
    )
""")

Retention Policies

Time travel requires keeping old data and log files. Retention is a trade-off: longer retention enables more historical queries but increases storage cost.

-- Delta Lake: set retention periods
ALTER TABLE orders SET TBLPROPERTIES (
    'delta.logRetentionDuration'          = 'interval 30 days',  -- keep log entries
    'delta.deletedFileRetentionDuration'  = 'interval 7 days'    -- keep data files after removal
);

-- Clean up files older than retention (VACUUM)
-- Default threshold: 7 days. Override only with caution.
VACUUM orders RETAIN 168 HOURS;   -- 7 days (safe minimum)
VACUUM orders RETAIN 0 HOURS;     -- DANGER: breaks time travel, ongoing streams
-- Iceberg: expire snapshots explicitly
CALL my_catalog.system.expire_snapshots(
    table => 'db.orders',
    older_than => TIMESTAMP '2024-01-01 00:00:00',
    retain_last => 10   -- always keep at least 10 snapshots regardless of age
);
-- expire_snapshots also removes orphan manifest files
-- Data files not referenced by any retained snapshot are marked for deletion
-- but actual file removal requires remove_orphan_files
Never VACUUM below 7 days on active streaming tables
Structured Streaming jobs checkpoint offsets to Delta log versions. Running VACUUM with a retention shorter than your streaming job's checkpoint interval will break stream recovery — the checkpoint points to a log version that no longer exists. Keep retention ≥ 7 days, and ideally 2× your longest streaming job recovery window.

Audit Pattern

# Track who changed what and when (Delta)
history_df = spark.sql("""
    SELECT version,
           timestamp,
           operationParameters.mode AS write_mode,
           operationMetrics.numOutputRows AS rows_written,
           userMetadata
    FROM (DESCRIBE HISTORY orders)
    WHERE operation IN ('WRITE', 'MERGE', 'UPDATE', 'DELETE')
    ORDER BY version DESC
""")

# Diff two versions: what changed between v10 and v15?
v10 = spark.read.format("delta").option("versionAsOf", 10).load(path)
v15 = spark.read.format("delta").option("versionAsOf", 15).load(path)

added   = v15.exceptAll(v10)  # rows in v15 not in v10
removed = v10.exceptAll(v15)  # rows in v10 not in v15

9. MERGE / Upsert Patterns

MERGE INTO is the Swiss Army knife for incremental data engineering: CDC ingestion, SCD updates, deduplication, and late-arriving data correction all reduce to merge variants.

Basic Upsert (Insert or Update)

-- Delta / Iceberg syntax is nearly identical
MERGE INTO target AS t
USING source AS s
ON t.id = s.id
WHEN MATCHED THEN
  UPDATE SET t.value = s.value,
             t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
  INSERT (id, value, updated_at)
  VALUES (s.id, s.value, s.updated_at);

CDC Ingestion Pattern

Change Data Capture (CDC) from a source database typically produces a stream of inserts, updates, and deletes with an operation type column (op: I/U/D).

from delta.tables import DeltaTable
from pyspark.sql import functions as F

# CDC stream from Kafka / Debezium
# Schema: id, op (I/U/D), payload (JSON with before/after), ts

cdc_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "db.public.orders.cdc")
    .load()
    .select(F.from_json(F.col("value").cast("string"), cdc_schema).alias("r"))
    .select("r.*")
)

def process_cdc_batch(batch_df, batch_id):
    dt = DeltaTable.forName(spark, "warehouse.orders")

    # Dedup within batch: keep latest record per key
    latest = (
        batch_df
        .withColumn("_rank", F.rank().over(
            Window.partitionBy("id").orderBy(F.desc("ts"))
        ))
        .filter(F.col("_rank") == 1)
        .drop("_rank")
    )

    (
        dt.alias("target")
        .merge(latest.alias("source"), "target.id = source.id")
        .whenMatchedDelete(condition="source.op = 'D'")
        .whenMatchedUpdate(
            condition="source.op IN ('U', 'u')",
            set={
                "target.value":      "source.after.value",
                "target.updated_at": "source.ts"
            }
        )
        .whenNotMatchedInsert(
            condition="source.op = 'I'",
            values={
                "id":         "source.after.id",
                "value":      "source.after.value",
                "updated_at": "source.ts"
            }
        )
        .execute()
    )

(
    cdc_df.writeStream
    .foreachBatch(process_cdc_batch)
    .option("checkpointLocation", "s3://bucket/checkpoints/orders-cdc")
    .trigger(processingTime="30 seconds")
    .start()
)

SCD Type 2 with MERGE

Slowly Changing Dimension Type 2 preserves full history by closing old rows and inserting new ones rather than updating in place.

from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, current_timestamp

dt = DeltaTable.forName(spark, "warehouse.dim_customer")

# updates_df: new/changed customers
# Strategy: detect changes, close old rows, insert new rows

(
    dt.alias("target")
    .merge(
        updates_df.alias("source"),
        # Match on business key AND open row
        "target.customer_id = source.customer_id AND target.is_current = true"
    )
    # Close the existing row when data has changed
    .whenMatchedUpdate(
        condition="""
            target.email != source.email OR
            target.address != source.address OR
            target.name != source.name
        """,
        set={
            "target.is_current":  lit(False),
            "target.effective_end": current_timestamp()
        }
    )
    # Insert updated row as a new current row
    .whenNotMatchedInsertAll()
    .execute()
)

# Insert new current rows for changed records
changed = (
    updates_df.alias("source")
    .join(
        dt.toDF().filter("is_current = true").alias("target"),
        "customer_id",
        "left"
    )
    .filter("""
        target.customer_id IS NULL OR
        target.email != source.email OR
        target.address != source.address
    """)
    .select("source.*")
    .withColumn("is_current", lit(True))
    .withColumn("effective_start", current_timestamp())
    .withColumn("effective_end", lit(None).cast("timestamp"))
)

changed.write.format("delta").mode("append").saveAsTable("warehouse.dim_customer")

Deduplication Pattern

-- Remove duplicates from a table (keep row with highest sequence)
MERGE INTO orders AS target
USING (
    SELECT id, MAX(seq) AS max_seq
    FROM orders
    GROUP BY id
    HAVING COUNT(*) > 1
) AS dupes ON target.id = dupes.id AND target.seq < dupes.max_seq
WHEN MATCHED THEN DELETE;

-- Or via CTAS (simpler, rewrites table):
CREATE OR REPLACE TABLE orders
USING DELTA AS
SELECT * EXCEPT(_row_num)
FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY id ORDER BY seq DESC) AS _row_num
    FROM orders
)
WHERE _row_num = 1;
Merge performance: partition pruning is critical
MERGE scans both source and target. Without partition pruning on the join condition, Delta reads the entire target table. Always include the partition column in the merge condition: target.event_date = source.event_date AND target.id = source.id. On large tables, a missing partition predicate turns a fast merge into a full table scan.

10. Compaction & Optimization

Object storage performs best with files in the 128 MB–1 GB range. Streaming writes produce many small files; compaction consolidates them. This is one of the most impactful ongoing maintenance tasks in a lakehouse.

Delta Lake: OPTIMIZE and Z-ORDER

-- Bin-pack: consolidate small files into target-size files (default 1 GB)
OPTIMIZE orders;

-- Z-ORDER: bin-pack + co-locate related rows by specified columns
-- Data skipping becomes highly effective when query filters match Z-ORDER cols
OPTIMIZE orders ZORDER BY (customer_id, order_date);

-- Partition-scoped optimize (recommended for large tables)
OPTIMIZE orders WHERE order_date >= '2024-02-01';

-- Auto optimize: configure at table level (Databricks Delta only)
ALTER TABLE orders SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',   -- bin-pack on every write
    'delta.autoOptimize.autoCompact'   = 'true'    -- async compaction after writes
);
# Optimize from Python
from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "orders")
dt.optimize().executeZOrderBy("customer_id", "order_date")

# Partition-scoped
from pyspark.sql.functions import col
dt.optimize().where(col("order_date") >= "2024-02-01").executeZOrderBy("customer_id")

Iceberg: rewrite_data_files

# Bin-pack compaction
spark.sql("""
    CALL my_catalog.system.rewrite_data_files(
        table => 'db.orders',
        strategy => 'binpack',
        where => 'order_date >= ''2024-01-01''',
        options => map(
            'target-file-size-bytes', '134217728',   -- 128 MB
            'min-file-size-bytes',    '33554432',    -- 32 MB (compact if smaller)
            'max-file-size-bytes',    '536870912',   -- 512 MB (split if larger)
            'max-concurrent-file-group-rewrites', '10'
        )
    )
""")

# Sort-based compaction (equivalent to Z-ORDER)
spark.sql("""
    CALL my_catalog.system.rewrite_data_files(
        table => 'db.orders',
        strategy => 'sort',
        sort_order => 'zorder(customer_id, order_date)'
    )
""")

# Rewrite manifests (reduce manifest file count after many small writes)
spark.sql("""
    CALL my_catalog.system.rewrite_manifests('db.orders')
""")

Vacuum and Snapshot Expiry

-- Delta: remove files no longer referenced by any version within retention period
-- Default retention: 7 days
SET spark.databricks.delta.retentionDurationCheck.enabled = false;  -- only for testing
VACUUM orders RETAIN 168 HOURS;   -- 7 days
VACUUM orders DRY RUN;            -- preview what would be deleted

-- Iceberg: two-step cleanup
-- Step 1: expire snapshots (removes snapshot metadata + unreferenced manifests)
CALL my_catalog.system.expire_snapshots(
    table => 'db.orders',
    older_than => TIMESTAMP '2024-01-15 00:00:00',
    retain_last => 5
);

-- Step 2: remove orphan files (data files left by failed writes, etc.)
CALL my_catalog.system.remove_orphan_files(
    table => 'db.orders',
    older_than => TIMESTAMP '2024-01-15 00:00:00'
);

Bloom Filters

-- Delta: add bloom filter for high-cardinality equality lookups
ALTER TABLE orders SET TBLPROPERTIES (
    'delta.bloomFilter.order_id.enabled'       = 'true',
    'delta.bloomFilter.order_id.fpp'           = '0.01',   -- false positive probability
    'delta.bloomFilter.order_id.numItems'      = '1000000'
);

-- Iceberg: bloom filter column stats
ALTER TABLE my_catalog.db.orders
SET TBLPROPERTIES (
    'write.parquet.bloom-filter-enabled.column.order_id' = 'true',
    'write.parquet.bloom-filter-max-bytes'               = '1048576'
);
Z-ORDER vs Bloom Filters: when to use each
Z-ORDER / sort order excels at range queries and multi-column filter combinations — it physically co-locates rows with similar values, enabling min/max-based data skipping. Bloom filters excel at point lookups on high-cardinality columns (UUIDs, order IDs) where min/max skipping is ineffective because the range spans all files. Use both: Z-ORDER on common analytical dimensions, bloom filters on ID columns used in lookup joins.

11. Catalog & Metadata

A catalog is the entry point for discovering and accessing tables. It maps logical table names to physical metadata locations. The catalog layer is where lakehouse governance, access control, and lineage live.

Catalog Options

CatalogFormat SupportMulti-EngineNotes
Unity Catalog (Databricks)Delta, Iceberg (read), ParquetSpark, DatabricksSQLRBAC, lineage, data masking, audit logs. Requires Databricks. Iceberg support via UniForm.
AWS Glue Data CatalogDelta, Iceberg, Hudi, ParquetSpark, Athena, Trino, Redshift SpectrumManaged, AWS-native. Best for AWS shops.
Hive Metastore (HMS)All formatsSpark, Hive, Trino, FlinkWidely supported but requires managing a separate HMS service. Limited partition evolution support.
Project NessieIceberg, DeltaSpark, Trino, Flink, DremioGit-like branches and tags for catalog-level version control. Open-source. Excellent for CI/CD on data.
Iceberg REST CatalogIcebergSpark, Trino, Flink, PyicebergOpen standard (Iceberg spec). Tabular, Snowflake Open Catalog, Polaris implement this spec.
Apache Polaris (Snowflake)IcebergSpark, Trino, Snowflake, FlinkOpen-source REST catalog donated to Apache. Production-grade Iceberg catalog.

Unity Catalog Configuration

# Unity Catalog hierarchy: metastore > catalog > schema > table
# Tables addressed as: catalog.schema.table

spark.sql("USE CATALOG my_prod_catalog")
spark.sql("USE SCHEMA warehouse")
spark.sql("SELECT * FROM orders LIMIT 10")  # reads my_prod_catalog.warehouse.orders

# Create external table registered in Unity Catalog
spark.sql("""
    CREATE TABLE IF NOT EXISTS my_prod_catalog.warehouse.orders
    USING DELTA
    LOCATION 's3://my-bucket/tables/orders'
    -- Unity Catalog manages access; storage credential referenced by name
""")

# Grant access
spark.sql("GRANT SELECT ON TABLE my_prod_catalog.warehouse.orders TO `data-analysts`")
spark.sql("GRANT MODIFY ON TABLE my_prod_catalog.warehouse.orders TO `data-engineers`")

# Row-level security via dynamic views
spark.sql("""
    CREATE OR REPLACE VIEW my_prod_catalog.warehouse.orders_filtered AS
    SELECT * FROM my_prod_catalog.warehouse.orders
    WHERE region = current_user_region()  -- UDF returns caller's region
""")

Nessie: Git-Like Data Versioning

# Nessie treats the catalog like a git repo
# Create a feature branch for a data migration
spark.sql("CREATE BRANCH IF NOT EXISTS etl_migration IN my_nessie_catalog")
spark.sql("USE REFERENCE etl_migration IN my_nessie_catalog")

# Perform migration on the branch (does not affect main)
spark.sql("""
    ALTER TABLE my_nessie_catalog.db.orders
    ADD COLUMN order_source STRING
""")
spark.sql("UPDATE my_nessie_catalog.db.orders SET order_source = 'web' WHERE ...")

# Validate on branch, then merge to main
spark.sql("""
    MERGE BRANCH etl_migration INTO main IN my_nessie_catalog
""")

# Tag a production snapshot for reproducibility
spark.sql("CREATE TAG prod_snapshot_2024_02 IN my_nessie_catalog")
# Future reads: spark.sql("USE REFERENCE prod_snapshot_2024_02 IN my_nessie_catalog")

PyIceberg: Python-Native Iceberg Access

from pyiceberg.catalog import load_catalog
import pyarrow as pa

# Connect to REST catalog (no Spark required)
catalog = load_catalog(
    "rest",
    **{
        "type": "rest",
        "uri": "https://my-catalog.example.com/api",
        "token": "my-bearer-token",
        "warehouse": "s3://my-bucket/warehouse",
    }
)

# Load table
table = catalog.load_table("db.orders")

# Scan with predicate pushdown (returns PyArrow dataset)
scan = table.scan(
    row_filter="order_date >= '2024-02-01' AND status = 'COMPLETED'",
    selected_fields=("order_id", "customer_id", "total"),
    limit=10000
)
arrow_table = scan.to_arrow()
df = arrow_table.to_pandas()

# Append data from Pandas/Arrow
new_data = pa.Table.from_pandas(new_df, schema=table.schema().as_arrow())
table.append(new_data)

# Time travel
snapshot = table.snapshot_by_id(8342957620)
old_scan = table.scan(snapshot_id=snapshot.snapshot_id)

12. Data Engineering Production Patterns

Medallion Architecture (Bronze / Silver / Gold)

The medallion architecture organizes data into layers of increasing quality and semantic richness. Each layer is a full table (not a view) so that failures at any stage are isolated.

BRONZE (Raw / Append-Only)
├── Exact copy of source data, no transformation
├── Schema-on-read or loose schema
├── Retains all history (never delete)
├── Partition by ingestion date
└── Sources: Kafka topics, database CDC, file drops, API calls

SILVER (Cleaned / Enriched)
├── Deduplicated, validated, type-cast
├── Bad records quarantined (not dropped) to quarantine table
├── Joined with reference data (dim tables)
├── Partition by business date (event_date, not ingest_date)
└── SLA: refresh within 15 minutes of Bronze

GOLD (Aggregated / Business-Facing)
├── Pre-aggregated metrics, KPIs, feature tables
├── Optimized for BI tools and ML consumption
├── Strict schema, enforced business rules
├── Partition by report date or rolling window
└── SLA: refresh within 1 hour
# Incremental Bronze → Silver pipeline (Delta)
from pyspark.sql import functions as F
from delta.tables import DeltaTable

# Read new Bronze records since last watermark
last_watermark = get_watermark("bronze_to_silver_orders")  # stored in control table

bronze_new = (
    spark.read.format("delta")
    .table("bronze.orders_raw")
    .filter(F.col("_ingest_ts") > last_watermark)
)

# Apply transformations
silver_df = (
    bronze_new
    .withColumn("order_id",   F.col("payload.order_id").cast("bigint"))
    .withColumn("total",      F.col("payload.total").cast("decimal(12,2)"))
    .withColumn("event_date", F.to_date("payload.order_ts"))
    # Dedup within batch
    .dropDuplicates(["order_id"])
    # Reject null primary keys (quarantine instead of drop)
    .filter(F.col("order_id").isNotNull())
)

# Upsert into Silver (idempotent)
dt = DeltaTable.forName(spark, "silver.orders")
(
    dt.alias("t")
    .merge(silver_df.alias("s"), "t.order_id = s.order_id AND t.event_date = s.event_date")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

# Advance watermark only after successful commit
update_watermark("bronze_to_silver_orders", bronze_new.agg(F.max("_ingest_ts")).collect()[0][0])

Data Quality with Great Expectations

import great_expectations as gx

context = gx.get_context()

# Define expectations on Silver orders table
batch = context.sources.add_spark(name="spark_source") \
    .add_dataframe_asset(name="silver_orders") \
    .add_batch_definition_whole_dataframe("batch") \
    .get_batch(batch_parameters={"dataframe": silver_df})

results = batch.validate(
    expectation_suite={
        "expectations": [
            {"type": "expect_column_values_to_not_be_null",
             "kwargs": {"column": "order_id"}},
            {"type": "expect_column_values_to_be_between",
             "kwargs": {"column": "total", "min_value": 0, "max_value": 1000000}},
            {"type": "expect_column_values_to_be_in_set",
             "kwargs": {"column": "status",
                        "value_set": ["PENDING", "COMPLETED", "CANCELLED", "REFUNDED"]}},
            {"type": "expect_column_pair_values_A_to_be_greater_than_B",
             "kwargs": {"column_A": "updated_at", "column_B": "created_at"}},
        ]
    }
)

if not results["success"]:
    failed = [r for r in results["results"] if not r["success"]]
    raise ValueError(f"Data quality check failed: {failed}")

Streaming + Batch Unification

# Lambda architecture problem: separate batch and streaming paths diverge
# Lakehouse solution: one Delta/Iceberg table serves both

# BATCH: full historical recompute (nightly)
spark.sql("""
    INSERT OVERWRITE silver.orders_hourly_agg
    PARTITION (agg_date)
    SELECT
        DATE_TRUNC('hour', order_ts) AS agg_hour,
        DATE(order_ts)               AS agg_date,
        COUNT(*)                     AS order_count,
        SUM(total)                   AS total_revenue,
        COUNT(DISTINCT customer_id)  AS unique_customers
    FROM silver.orders
    WHERE order_date >= '2024-01-01'
    GROUP BY 1, 2
""")

# STREAMING: append new hourly aggregates as they complete
(
    spark.readStream
    .format("delta")
    .table("silver.orders")
    .groupBy(
        F.window("order_ts", "1 hour"),
        F.to_date("order_ts").alias("agg_date")
    )
    .agg(
        F.count("*").alias("order_count"),
        F.sum("total").alias("total_revenue")
    )
    .writeStream
    .format("delta")
    .outputMode("complete")  # or "update" with watermark
    .option("checkpointLocation", "s3://bucket/checkpoints/orders-hourly-agg")
    .trigger(processingTime="5 minutes")
    .toTable("silver.orders_hourly_agg")
)

Pipeline Orchestration (Airflow / Dagster)

# Airflow DAG for medallion pipeline
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

with DAG(
    dag_id="orders_medallion_pipeline",
    schedule="@hourly",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:

    ingest_bronze = DatabricksRunNowOperator(
        task_id="ingest_bronze_orders",
        job_id="{{ var.value.bronze_orders_job_id }}",
    )

    quality_check = DatabricksRunNowOperator(
        task_id="validate_bronze_schema",
        job_id="{{ var.value.dq_check_job_id }}",
    )

    silver_transform = DatabricksRunNowOperator(
        task_id="transform_silver_orders",
        job_id="{{ var.value.silver_orders_job_id }}",
    )

    gold_aggregate = DatabricksRunNowOperator(
        task_id="aggregate_gold_orders_hourly",
        job_id="{{ var.value.gold_orders_agg_job_id }}",
    )

    ingest_bronze >> quality_check >> silver_transform >> gold_aggregate

13. MLOps on Lakehouse

The lakehouse is an excellent ML platform substrate because it gives you versioned, reproducible datasets, ACID-consistent feature tables, and efficient large-scale data export — without a separate data warehouse and object store copy.

Feature Store on Delta / Iceberg

# Feature table: computed once, served to many models
# Key requirements: point-in-time correctness, efficient range reads

# Create feature table (Delta)
spark.sql("""
    CREATE TABLE IF NOT EXISTS feature_store.customer_features (
        customer_id       BIGINT      NOT NULL,
        feature_date      DATE        NOT NULL,          -- point-in-time key
        order_count_30d   INT,
        revenue_90d       DECIMAL(14,2),
        days_since_last   INT,
        churn_risk_score  FLOAT,
        feature_version   INT         DEFAULT 1          -- track feature iteration
    )
    USING DELTA
    PARTITIONED BY (feature_date)
    TBLPROPERTIES (
        'delta.enableChangeDataFeed'           = 'true',
        'delta.autoOptimize.optimizeWrite'     = 'true'
    )
""")

# Point-in-time join: join event labels with features as of label date
# Critical: use the feature value as-of the event date, not today's value
labeled_events = spark.table("gold.churn_labels")  # customer_id, label_date, churned

training_df = (
    labeled_events.alias("e")
    .join(
        spark.table("feature_store.customer_features").alias("f"),
        on=(
            (F.col("e.customer_id") == F.col("f.customer_id")) &
            (F.col("f.feature_date") == F.col("e.label_date"))
            # Exact date match; for range: f.feature_date = e.label_date - 1
        ),
        how="left"
    )
)

Training Dataset Versioning

import mlflow
from pyspark.sql import functions as F

# Register the exact Delta table version used for training
# This ensures reproducibility: re-run the experiment with same data later

with mlflow.start_run() as run:
    # Record which Delta version was used
    delta_history = spark.sql("""
        SELECT version, timestamp
        FROM (DESCRIBE HISTORY feature_store.customer_features)
        ORDER BY version DESC LIMIT 1
    """).collect()[0]

    mlflow.log_params({
        "training_data_table":   "feature_store.customer_features",
        "training_data_version": delta_history["version"],
        "training_data_ts":      str(delta_history["timestamp"]),
        "feature_date_range":    "2023-01-01 to 2024-01-01",
        "label_column":          "churned",
        "num_features":          len(feature_cols),
    })

    # Train model
    model = train_model(training_df, feature_cols, label_col="churned")
    mlflow.sklearn.log_model(model, artifact_path="model")

# Reproduce training data exactly in the future:
historical_features = (
    spark.read
    .format("delta")
    .option("versionAsOf", delta_history["version"])
    .table("feature_store.customer_features")
)

Inference Data Logging

# Log model predictions to Delta for monitoring and retraining

prediction_schema = StructType([
    StructField("prediction_id",  StringType(),  False),
    StructField("model_version",  StringType(),  False),
    StructField("customer_id",    LongType(),    False),
    StructField("score",          FloatType(),   False),
    StructField("predicted_at",   TimestampType(), False),
    StructField("feature_vector", StringType(),  True),  # JSON-serialized
])

def log_predictions(predictions_df: "pd.DataFrame", model_version: str):
    spark_df = spark.createDataFrame(predictions_df, prediction_schema)
    (
        spark_df.write
        .format("delta")
        .mode("append")
        .partitionBy(F.to_date("predicted_at").alias("prediction_date"))
        .saveAsTable("ml_monitoring.predictions")
    )

# Monitor for drift: compare prediction distribution today vs baseline
today_dist = spark.sql("""
    SELECT AVG(score) as mean_score, STDDEV(score) as std_score,
           PERCENTILE(score, 0.5) as p50, PERCENTILE(score, 0.95) as p95
    FROM ml_monitoring.predictions
    WHERE prediction_date = CURRENT_DATE()
      AND model_version = 'v3.2'
""")

baseline_dist = spark.sql("""
    SELECT AVG(score), STDDEV(score), PERCENTILE(score, 0.5), PERCENTILE(score, 0.95)
    FROM ml_monitoring.predictions
    WHERE prediction_date BETWEEN '2024-01-01' AND '2024-01-07'
      AND model_version = 'v3.2'
""")

A/B Test Data Pipelines

-- Store experiment assignments in Gold layer
CREATE TABLE gold.experiment_assignments (
    user_id      BIGINT    NOT NULL,
    experiment   STRING    NOT NULL,
    variant      STRING    NOT NULL,   -- 'control' / 'treatment'
    assigned_at  TIMESTAMP NOT NULL
)
USING DELTA
PARTITIONED BY (DATE(assigned_at))
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- Join experiment assignments with outcome metrics
SELECT
    ea.variant,
    COUNT(DISTINCT ea.user_id)               AS users,
    COUNT(o.order_id)                        AS conversions,
    COUNT(o.order_id) / COUNT(DISTINCT ea.user_id) AS cvr,
    SUM(o.total) / COUNT(DISTINCT ea.user_id) AS revenue_per_user
FROM gold.experiment_assignments ea
LEFT JOIN silver.orders o
    ON ea.user_id = o.customer_id
    AND o.order_ts >= ea.assigned_at   -- only post-assignment events
    AND o.order_ts < ea.assigned_at + INTERVAL 7 DAYS
WHERE ea.experiment = 'checkout_redesign_2024_02'
GROUP BY ea.variant;
Delta Change Data Feed for feature freshness
Enable CDF on feature tables and subscribe downstream services to the change stream. When a customer feature is recalculated, the feature serving layer receives only the changed rows instead of polling the full table. This enables sub-minute feature freshness at scale without a dedicated feature store product.

14. Performance Tuning

File Sizing Strategy

ScenarioRecommendationWhy
Batch OLAP tables128 MB – 1 GB per fileBalances S3 LIST overhead vs task parallelism
Streaming micro-batchAuto-compact enabled; target 128 MBStreaming creates many small files; async compaction merges them
Lookup / point queriesSmaller files (32–64 MB) with bloom filtersReduces I/O when reading one record from a large table
Full table scan / ML trainingLarge files (256 MB – 1 GB)Minimizes task launch overhead; maximizes sequential read throughput
Iceberg target file sizeSet via write.target-file-size-bytes = 134217728Applied at write time to avoid compaction backlog

Data Skipping: Min/Max Statistics

# Delta: statistics are collected per file at write time
# Default: collect stats on first 32 columns
# Increase if filtering on later columns:
spark.sql("""
    ALTER TABLE orders SET TBLPROPERTIES (
        'delta.dataSkippingNumIndexedCols' = '64'
    )
""")

# Check if data skipping is working (Spark plan shows "PartitionFilters" and "DataFilters")
spark.sql("EXPLAIN COST SELECT * FROM orders WHERE order_date = '2024-02-14'")
# Look for: "PushedFilters: [EqualTo(order_date,2024-02-14)]"
# and numFiles reduction in query metrics

# Force stats recomputation after bulk ingestion without stats
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS")
# Or for specific columns:
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS order_id, customer_id, order_date")

Predicate Pushdown and Column Pruning

# Iceberg: predicate pushdown via scan API
scan = table.scan(
    row_filter="order_date >= '2024-02-01' AND status = 'COMPLETED'",
    selected_fields=("order_id", "customer_id", "total"),
    # These predicates are pushed to the manifest layer:
    # 1. Partition pruning: skip manifests whose partition range doesn't overlap
    # 2. File-level: skip files where min(order_date) > '2024-02-01'
    # 3. Parquet row-group: skip row groups based on column stats
)

# Verify with explain
df = spark.table("my_catalog.db.orders") \
    .filter("order_date >= '2024-02-01' AND status = 'COMPLETED'") \
    .select("order_id", "customer_id", "total")
df.explain("formatted")
# Look for: SelectedFiles in scan metrics, pushed predicates in plan

Caching Strategies

# Spark cache for repeated use within a job
# Cache Delta table (in-memory + disk)
spark.sql("CACHE TABLE orders")
# or
df = spark.table("orders")
df.cache()
df.count()  # trigger materialize

# Databricks Delta cache (off-heap, SSD-backed, survives Spark restarts)
# Enabled by default on Databricks; configure node type with SSD
spark.sql("CACHE SELECT * FROM orders WHERE order_date >= '2024-01-01'")

# Iceberg: use Arrow for in-process cache
import pyarrow as pa
cached = table.scan(row_filter="order_date = '2024-02-14'").to_arrow()
# Arrow table lives in process memory; serialize with pickle or feather for reuse

# Best practice: don't cache the raw lake tables; cache Gold/aggregated tables
# Raw tables are large and change frequently — caching them is expensive

Vectorized Execution (Photon / Velox)

# Photon (Databricks): C++ vectorized engine, transparent to users
# Enable on cluster: set "Runtime" to Photon-enabled type
# Supported operations: scans, joins, aggregations, sorts, DML on Delta tables

# Velox (Meta's open-source vectorized engine): used by Presto/Trino
# No explicit user configuration; engine uses it internally

# Practical tips for vectorized engine performance:
# 1. Avoid Python UDFs — they break vectorization; use Spark SQL functions instead
# 2. Use columnar formats (Parquet/ORC) — vectorized engines read columns in batches
# 3. Prefer narrow wide tables to many joins — reduces row-level processing
# 4. Enable adaptive query execution (AQE):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

15. Migration & Interoperability

Migrating Hive Tables to Iceberg

# Option 1: In-place migration (no data copy)
# Converts Hive table metadata to Iceberg format; data files stay in place
spark.sql("""
    CALL my_catalog.system.migrate(
        table => 'my_hive_catalog.db.orders'
    )
""")
# Note: Hive table must be external (not managed) with Parquet/ORC files

# Option 2: Snapshot migration (dual-read during transition)
# Creates an Iceberg table that reads the existing Hive data files
spark.sql("""
    CALL my_catalog.system.snapshot(
        source_table => 'my_hive_catalog.db.orders',
        table        => 'my_iceberg_catalog.db.orders'
    )
""")
# Iceberg table shares files with Hive; new writes go to Iceberg only
# Hive table still readable (no dual write needed during transition)
# After validation, cut over by pointing all readers to Iceberg table

# Option 3: CTAS (safest, requires double storage temporarily)
spark.sql("""
    CREATE TABLE my_iceberg_catalog.db.orders_iceberg
    USING iceberg
    AS SELECT * FROM my_hive_catalog.db.orders_hive
""")

Delta to Iceberg (UniForm)

Delta UniForm enables reading Delta tables as Iceberg or Hudi tables without any data copy. The Delta write path generates Iceberg metadata alongside the Delta log, so Trino/Snowflake/etc. can read the table via the Iceberg REST catalog.

-- Enable UniForm on a Delta table (Databricks Delta 3.x+)
ALTER TABLE orders SET TBLPROPERTIES (
    'delta.universalFormat.enabledFormats' = 'iceberg'
    -- 'delta.universalFormat.enabledFormats' = 'iceberg,hudi'  -- both
);

-- UniForm generates Iceberg metadata at:
-- s3://bucket/tables/orders/metadata/*.metadata.json
-- Register this with your Iceberg REST catalog, Snowflake, etc.

-- Convert Delta table to standalone Iceberg (data copy required):
-- Step 1: Export as Parquet snapshot
spark.sql("""
    CREATE TABLE orders_parquet USING PARQUET AS
    SELECT * FROM orders
""")
-- Step 2: Create Iceberg table from Parquet
spark.sql("""
    CALL my_iceberg_catalog.system.add_files(
        table => 'db.orders_iceberg',
        source_table => 'my_spark_catalog.db.orders_parquet'
    )
""")

Multi-Engine Access

# Iceberg table readable by multiple engines simultaneously

# Spark (primary writer + reader)
spark.table("my_catalog.db.orders").show()

# Trino (ad-hoc SQL analytics)
# trino.properties: connector.name=iceberg, iceberg.catalog.type=rest
# SELECT * FROM my_catalog.db.orders WHERE order_date = '2024-02-14'

# DuckDB (local laptop analytics)
pip install duckdb pyiceberg
import duckdb
from pyiceberg.catalog import load_catalog

catalog = load_catalog("rest", uri="http://my-catalog:8181/api")
table = catalog.load_table("db.orders")
arrow_data = table.scan(row_filter="order_date = '2024-02-14'").to_arrow()

conn = duckdb.connect()
conn.register("orders", arrow_data)
result = conn.sql("SELECT customer_id, SUM(total) FROM orders GROUP BY 1 ORDER BY 2 DESC LIMIT 10").df()
# Flink + Iceberg (streaming writes from Flink)
# flink-conf.yaml / table environment:
# table_env.get_config().set("table.exec.mini-batch.enabled", "true")

# Flink SQL
"""
CREATE CATALOG iceberg_catalog WITH (
  'type'          = 'iceberg',
  'catalog-type'  = 'rest',
  'uri'           = 'http://nessie:19120/api/v1',
  'warehouse'     = 's3://my-bucket/warehouse'
);

CREATE TABLE iceberg_catalog.db.events (
  event_id  BIGINT,
  user_id   BIGINT,
  event_ts  TIMESTAMP(3),
  payload   STRING,
  WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH (
  'format-version' = '2',
  'write.upsert.enabled' = 'true'
);

INSERT INTO iceberg_catalog.db.events
SELECT event_id, user_id, event_ts, payload
FROM kafka_source;
"""

Vendor Lock-in Considerations

Risk AreaDelta LakeIcebergHudi
Format lock-inLow (open spec, but Databricks leads)Very low (Apache-governed, many implementations)Low (Apache-governed)
Catalog lock-inMedium (Unity Catalog is Databricks-only)Low (multiple REST catalog impls)Medium (typically HMS or Glue)
Feature lock-inMedium (Photon, AutoOptimize are Databricks-only)LowLow
Ecosystem maturityExcellent on Databricks; good on EMR/OSSExcellent everywhereGood on Spark; limited elsewhere
Escape hatchUniForm → Iceberg; export to ParquetExport to Parquet triviallyExport to Parquet; convert to Iceberg

16. Interview Scenarios & Decision Framework

Decision Framework: Delta vs Iceberg vs Hudi

QuestionDeltaIcebergHudi
Already on Databricks?Strong yesVia UniFormNo
Need multi-engine reads (Trino, Snowflake, Flink)?Possible (UniForm)Best choicePossible
Near-real-time CDC upserts (< 1 min latency)?PossiblePossibleBest choice
Need partition evolution without data rewrite?NoBest choiceNo
Large enterprise, need governance + lineage?Unity CatalogPolaris/Nessie + externalHMS + external
AWS-native stack (Glue, Athena, EMR)?Good (native EMR support)Excellent (native Athena/Glue)Good
Google Cloud stack?Databricks on GCPBigLake (Iceberg-native)Less common
Azure stack?Excellent (Azure Databricks + ADLS)Good (via Fabric/Synapse)Limited
Long-term open format strategy?Medium (Databricks controls roadmap)Best (Apache foundation, many vendors)Good (Apache foundation)

Common Interview Questions and Answers

Q: Explain ACID transactions in a lakehouse — how does Delta Lake achieve atomicity?

Object storage (S3/GCS) provides no atomicity at the directory level — you can't atomically add 10 new Parquet files and remove 5 old ones. Delta Lake solves this with an append-only transaction log (_delta_log/).

When a write completes, Delta atomically writes a single JSON entry to the log that describes all add and remove actions. Readers determine the current table state by replaying the log from the latest checkpoint. Because the log entry is a single object write (atomic in S3), either the entire commit happens or nothing changes from readers' perspective.

Isolation is achieved via optimistic concurrency control: writers read the current log version, compute their changes, then attempt to write the next version number. If another writer committed in the meantime (version conflict), the writer retries or raises a conflict exception depending on the isolation level (Serializable vs WriteSerializable).

Q: Why does Iceberg handle partition evolution better than Delta Lake?

Delta Lake partitioning is a physical property of the directory structure: files are written to partition_col=value/ paths. Changing the partition column means the existing directory structure is wrong and all files must be rewritten.

Iceberg treats partitioning as metadata, not directory structure. Each manifest file records which partition spec was used when writing its data files. When you execute ALTER TABLE REPLACE PARTITION FIELD, Iceberg creates a new partition spec but the old manifests (and their files) remain valid and readable under the old spec.

Queries work transparently across both specs: Iceberg evaluates the partition predicate against each spec separately, pruning manifests from both. This is possible because Iceberg's hidden partitioning decouples the physical layout from the logical column — the partition value is derived (e.g., days(event_ts)), not stored as a directory name in the user's visible schema.

Q: You have a Delta table with 10M small files from a year of streaming writes. How do you fix this?

Root cause: Streaming micro-batches write one or more small files per partition per trigger interval. With 5-minute triggers and date partitioning, you get up to 288 files/day — 105,120 files/year.

Immediate fix:

-- Run partition-scoped OPTIMIZE to avoid scanning unneeded partitions
-- Start with recent partitions first (most impactful for queries)
OPTIMIZE orders WHERE order_date >= '2024-01-01';
-- Z-ORDER if you have common filter columns
OPTIMIZE orders ZORDER BY (customer_id) WHERE order_date >= '2024-01-01';

-- Then vacuum old files (after OPTIMIZE, old small files are "removed" in log)
VACUUM orders RETAIN 168 HOURS;

Prevention going forward:

  • Enable delta.autoOptimize.autoCompact = true on the streaming target table
  • Or schedule a nightly OPTIMIZE job scoped to yesterday's partition
  • Use trigger(availableNow=True) instead of micro-batch for non-latency-critical pipelines
  • Consider coarser partitioning (weekly instead of daily) if query patterns allow
Q: How do you implement SCD Type 2 on a lakehouse at scale?

SCD Type 2 requires closing old rows and inserting new ones. The naive approach (full scan + merge) breaks at billion-row scale. Production approach:

  1. Detect changes: join incoming updates to the current snapshot using the business key. Use a hash of all tracked columns to detect changes without comparing every column.
  2. Partition the SCD table by hash bucket of the business key: this ensures the MERGE only scans relevant partitions. Don't partition by date — the "current" rows span all historical dates.
  3. MERGE into a partition-pruned window: include the hash bucket in the merge condition so Delta reads only affected files.
  4. Close old rows and insert new rows in two operations or use a single MERGE with whenMatchedUpdate + whenNotMatchedInsert, relying on the fact that "closed" rows become NOT MATCHED on the next run (they have a different effective_end, so a second MERGE finds them correctly).
-- Hash bucket partitioning for SCD Type 2
CREATE TABLE dim_customer (
    surrogate_key BIGINT GENERATED ALWAYS AS IDENTITY,
    customer_id   BIGINT NOT NULL,
    hash_bucket   INT    GENERATED ALWAYS AS (customer_id % 256),
    -- ... tracked attributes ...
    is_current    BOOLEAN NOT NULL DEFAULT true,
    effective_start TIMESTAMP NOT NULL,
    effective_end   TIMESTAMP
)
USING DELTA
PARTITIONED BY (hash_bucket);
Q: Describe the write protocol for concurrent writers in Iceberg (snapshot isolation).

Iceberg uses optimistic concurrency with snapshot isolation:

  1. Writer reads the current table metadata (snapshot N).
  2. Writer produces new data files and a new manifest.
  3. Writer constructs a new metadata file pointing to snapshot N+1 (which includes the new manifest + all existing manifests from snapshot N).
  4. Writer atomically swaps the catalog pointer from the old metadata file to the new one. Most catalogs implement this with a conditional PUT (if-match on the current metadata path) or CAS (compare-and-swap).
  5. If another writer committed first (the catalog pointer changed), the writer detects the conflict, reads the new current state, re-validates whether the conflict is critical (e.g., overlapping files), and either retries or raises an exception.

Key property: readers always see a consistent snapshot because they hold a reference to an immutable metadata file. New writes create new metadata files; they never modify existing ones. There are no exclusive file locks.

Q: How would you design a feature store on top of a lakehouse for ML?

Key requirements: point-in-time correctness, low-latency batch serving, efficient incremental updates, versioned datasets for training reproducibility.

Architecture:

  • Offline store: Delta or Iceberg tables partitioned by feature_date. Computed daily or hourly by Spark jobs. Serves training data via point-in-time joins.
  • Online store: Redis or DynamoDB for <10ms serving. Populated by a Spark streaming job that reads the Delta CDF and upserts to Redis. Key = entity ID; value = latest feature vector.
  • Point-in-time join: join training labels (customer_id, label_date) to features using f.feature_date = l.label_date. Never join to feature_date = CURRENT_DATE() — this leaks future information into historical training data.
  • Versioning: record the Delta table version + feature_date range in MLflow. To reproduce training data, read with versionAsOf.
  • Feature groups: separate tables per entity type (customer, product, session). Join on demand for model-specific feature sets.

Production Gotchas Summary

GotchaSymptomFix
VACUUM too aggressiveStructured Streaming job fails on restart: "version not found"Set retention ≥ 7 days; use RETAIN 168 HOURS
No partition pruning in MERGEMERGE scans entire target table for every batchAdd partition column to merge condition
Iceberg identity partition on timestampOne partition per unique timestamp → millions of partitionsUse days(ts) or hours(ts) transform
Missing precombine field (Hudi)Silent data loss when batch has duplicate keysAlways set hoodie.datasource.write.precombine.field
Rename without column mapping (Delta)Rename works in metadata but old Parquet files return null for renamed columnEnable column mapping mode before renaming
Future data leakage in point-in-time joinModel performs great in training, fails in productionStrict as-of join: f.feature_date = e.label_date or < e.label_date
Small files after streamingSlow queries; thousands of file opens per taskEnable autoCompact or schedule nightly OPTIMIZE
Catalog pointer update race (Iceberg)Concurrent writers: CommitFailedExceptionExpected behavior; add retry logic; use higher-level frameworks (Flink's exactly-once)
DML on non-partitioned Delta tableUPDATE/DELETE rewrites entire tableAlways partition tables that receive DML; use deletion vectors (Delta 2.3+)
Iceberg snapshot accumulationMetadata files grow without bound; expire_snapshots never calledSchedule expire_snapshots + remove_orphan_files daily
Interview mental model for table formats
Think of open table formats as git for data: every write is a commit, every read sees a consistent snapshot, you can check out historical versions, and compaction is like garbage collecting old objects. The catalog is the .git directory pointer — it tells you where HEAD is. This analogy holds surprisingly well and helps reason through questions about concurrent writes, time travel, and metadata management.

17. Local Development Setup (macOS)

You don't need a cloud account or Databricks to experiment with lakehouse table formats. Everything below runs on a MacBook with Docker.

Delta Lake with PySpark (Simplest)

Install PySpark + delta-spark. Tables are just directories on your local filesystem.

# Create a venv and install
python3 -m venv lakehouse-lab && source lakehouse-lab/bin/activate
pip install pyspark==3.5.* delta-spark==3.2.* jupyterlab pandas

# Start Jupyter (optional)
jupyter lab
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Builder pattern — delta-spark auto-configures the Spark session
builder = (
    SparkSession.builder
    .master("local[*]")
    .appName("delta-local")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # Keep Spark UI off for local dev
    .config("spark.ui.enabled", "false")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Create a Delta table on local filesystem
data = spark.createDataFrame([
    (1, "alice", "2024-01-15", 120.50),
    (2, "bob",   "2024-01-16", 89.99),
    (3, "carol", "2024-02-01", 210.00),
], ["order_id", "customer", "order_date", "amount"])

data.write.format("delta").mode("overwrite").save("/tmp/delta-lab/orders")

# Read it back
orders = spark.read.format("delta").load("/tmp/delta-lab/orders")
orders.show()

# Time travel — check the log
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/tmp/delta-lab/orders")
dt.history().select("version", "timestamp", "operation").show(truncate=False)
Inspect the Delta log directly
ls /tmp/delta-lab/orders/_delta_log/ — you'll see JSON commit files (00000000000000000000.json, etc.) and eventually checkpoint Parquet files. Reading these helps build intuition for how Delta tracks changes.

Delta Lake with DuckDB (No Spark, Fastest)

DuckDB reads Delta tables natively — perfect for quick local queries without waiting for Spark to start up.

pip install duckdb deltalake pandas
import duckdb

con = duckdb.connect()

# Write a Delta table from DuckDB (via deltalake Python bindings)
con.sql("""
    INSTALL delta; LOAD delta;
    CREATE TABLE local_orders AS SELECT * FROM read_csv_auto('orders.csv');
""")

# Or read existing Delta tables written by Spark
con.sql("""
    SELECT customer, SUM(amount) as total
    FROM delta_scan('/tmp/delta-lab/orders')
    GROUP BY customer
    ORDER BY total DESC
""").show()
# deltalake Python package — write Delta without Spark at all
from deltalake import DeltaTable, write_deltalake
import pandas as pd

df = pd.DataFrame({
    "order_id": [4, 5],
    "customer": ["dave", "eve"],
    "order_date": ["2024-02-10", "2024-02-11"],
    "amount": [55.00, 340.00]
})

# Write (creates or appends)
write_deltalake("/tmp/delta-lab/orders", df, mode="append")

# Read back
dt = DeltaTable("/tmp/delta-lab/orders")
dt.to_pandas()

Iceberg with PySpark + Local Catalog

pip install pyspark==3.5.*
# Iceberg JARs are pulled automatically by Spark package config below
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("iceberg-local")
    .config("spark.jars.packages",
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1")
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    # Hadoop catalog — stores metadata in local filesystem (simplest for dev)
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-lab/warehouse")
    .config("spark.ui.enabled", "false")
    .getOrCreate()
)

# Create a namespaced table
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.lab")
spark.sql("""
    CREATE TABLE IF NOT EXISTS local.lab.orders (
        order_id   INT,
        customer   STRING,
        order_date DATE,
        amount     DOUBLE
    )
    USING iceberg
    PARTITIONED BY (days(order_date))
""")

# Insert data
spark.sql("""
    INSERT INTO local.lab.orders VALUES
    (1, 'alice', DATE '2024-01-15', 120.50),
    (2, 'bob',   DATE '2024-01-16', 89.99),
    (3, 'carol', DATE '2024-02-01', 210.00)
""")

# Snapshots
spark.sql("SELECT * FROM local.lab.orders.snapshots").show(truncate=False)

# Partition evolution — no rewrite!
spark.sql("ALTER TABLE local.lab.orders REPLACE PARTITION FIELD days(order_date) WITH months(order_date)")

Iceberg with PyIceberg + DuckDB (No Spark)

pip install "pyiceberg[sql-sqlite,pyarrow]" duckdb pandas
from pyiceberg.catalog.sql import SqlCatalog
import pyarrow as pa

# SQLite-backed catalog — zero infrastructure
catalog = SqlCatalog(
    "local",
    **{
        "uri": "sqlite:////tmp/iceberg-lab/catalog.db",
        "warehouse": "file:///tmp/iceberg-lab/warehouse",
    }
)

catalog.create_namespace_if_not_exists("lab")

# Create table from PyArrow schema
schema = pa.schema([
    pa.field("order_id", pa.int32()),
    pa.field("customer", pa.string()),
    pa.field("order_date", pa.date32()),
    pa.field("amount", pa.float64()),
])

table = catalog.create_table_if_not_exists("lab.orders", schema=schema)

# Append data
batch = pa.table({
    "order_id":   [1, 2, 3],
    "customer":   ["alice", "bob", "carol"],
    "order_date": [
        pa.scalar("2024-01-15").cast(pa.date32()),
        pa.scalar("2024-01-16").cast(pa.date32()),
        pa.scalar("2024-02-01").cast(pa.date32()),
    ],
    "amount":     [120.50, 89.99, 210.00],
})
table.append(batch)

# Query with DuckDB
import duckdb
arrow_table = table.scan().to_arrow()
duckdb.sql("SELECT customer, amount FROM arrow_table ORDER BY amount DESC").show()

Full Local Stack with Docker Compose

This gives you an S3-compatible store (MinIO), a REST catalog (Nessie for Iceberg), and Spark — closer to what production looks like.

# docker-compose.yml
services:
  minio:
    image: minio/minio:latest
    ports:
      - "9000:9000"
      - "9001:9001"    # Console
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: password
    command: server /data --console-address ":9001"
    volumes:
      - minio-data:/data

  minio-init:
    image: minio/mc:latest
    depends_on: [minio]
    entrypoint: >
      /bin/sh -c "
        mc alias set local http://minio:9000 admin password;
        mc mb local/lakehouse --ignore-existing;
        exit 0;
      "

  nessie:
    image: ghcr.io/projectnessie/nessie:latest
    ports:
      - "19120:19120"

  spark:
    image: bitnami/spark:3.5
    ports:
      - "4040:4040"
      - "8888:8888"
    environment:
      - SPARK_MODE=master
    volumes:
      - ./notebooks:/home/notebooks
    depends_on: [minio, nessie]

volumes:
  minio-data:
# Start the stack
docker compose up -d

# Create a MinIO bucket (if minio-init didn't run)
docker compose exec minio mc alias set local http://localhost:9000 admin password
docker compose exec minio mc mb local/lakehouse
# Connect Spark to MinIO + Nessie (inside the Spark container or local PySpark)
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]")
    .config("spark.jars.packages", ",".join([
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1",
        "org.apache.hadoop:hadoop-aws:3.3.4",
        "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.95.0",
    ]))
    # S3/MinIO
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
    .config("spark.hadoop.fs.s3a.access.key", "admin")
    .config("spark.hadoop.fs.s3a.secret.key", "password")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    # Nessie catalog for Iceberg
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.nessie.catalog-impl",
            "org.apache.iceberg.nessie.NessieCatalog")
    .config("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v2")
    .config("spark.sql.catalog.nessie.warehouse", "s3a://lakehouse/iceberg")
    .config("spark.sql.catalog.nessie.ref", "main")
    .config("spark.sql.extensions", ",".join([
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "org.projectnessie.spark.extensions.NessieSparkSessionExtensions",
    ]))
    .getOrCreate()
)

spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.lab")
spark.sql("""
    CREATE TABLE nessie.lab.events (
        event_id   BIGINT,
        event_type STRING,
        event_ts   TIMESTAMP,
        payload    STRING
    ) USING iceberg
    PARTITIONED BY (days(event_ts))
""")

# Now you have Iceberg tables stored in MinIO with Nessie catalog — same
# architecture as production but all on your laptop

Which Local Setup to Use?

GoalSetupTime to Start
Quick Delta read/write, no Spark overheadDuckDB + deltalake Python30 seconds
Quick Iceberg read/write, no Spark overheadPyIceberg + SQLite catalog30 seconds
Delta with full Spark API (MERGE, Z-ORDER)PySpark + delta-spark2 minutes
Iceberg with full Spark SQLPySpark + iceberg JARs2 minutes
Realistic prod simulation (S3 + catalog + Spark)Docker Compose stack5 minutes
Java required for PySpark
PySpark needs Java 11 or 17. On macOS: brew install openjdk@17 and ensure JAVA_HOME is set. DuckDB and PyIceberg do not require Java.

18. Production Setup

Production lakehouse deployments center around three decisions: where data lives (storage), how tables are discovered (catalog), and what runs the queries (compute). Delta and Iceberg have different natural fits depending on whether you use Databricks or build on AWS services directly.

Architecture Comparison

ComponentDatabricks (Managed)AWS-Native (No Databricks)
Storage S3 / ADLS / GCS (you own the buckets) S3 (you own the buckets)
Table Format Delta (native) — Iceberg readable via UniForm Iceberg (best native support) or Delta
Catalog Unity Catalog AWS Glue Data Catalog (most common)
Nessie / Polaris (Iceberg-specific)
Compute — ETL Databricks Jobs (Spark clusters) EMR Spark / Glue ETL / Flink on EMR
Compute — SQL BI Databricks SQL Warehouses Athena (serverless) / Trino on EMR / Redshift Spectrum
Orchestration Databricks Workflows Airflow (MWAA) / Step Functions
Governance Unity Catalog (RBAC, lineage, masking) Lake Formation + Glue Catalog (IAM-based)
Cost Model DBU (Databricks Units) + cloud infra Per-query (Athena) / per-instance (EMR) + S3
Complexity Lower — single platform, managed Higher — more services to wire together

Databricks + Delta Lake

This is the simplest production path. Databricks is Delta's primary maintainer, so Delta features land in Databricks first (liquid clustering, deletion vectors, UniForm). Unity Catalog handles governance end-to-end.

# Typical Databricks production pipeline (runs on a Databricks Job cluster)

# ── Bronze: ingest raw events from Kafka to Delta ──
raw_events = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "user-events")
    .option("startingOffsets", "earliest")
    .load()
)

(raw_events
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://prod-lakehouse/checkpoints/bronze_events")
    .trigger(availableNow=True)  # Process all available, then stop (cost-effective)
    .toTable("prod_catalog.bronze.user_events"))

# ── Silver: clean + deduplicate ──
spark.sql("""
    CREATE OR REPLACE TABLE prod_catalog.silver.user_events AS
    SELECT
        from_json(value, 'user_id STRING, event_type STRING, properties MAP<STRING,STRING>, ts TIMESTAMP') AS parsed,
        timestamp AS kafka_ts
    FROM prod_catalog.bronze.user_events
    QUALIFY ROW_NUMBER() OVER (
        PARTITION BY parsed.user_id, parsed.event_type, parsed.ts
        ORDER BY kafka_ts DESC
    ) = 1
""")

# ── Gold: aggregate for BI ──
spark.sql("""
    CREATE OR REPLACE TABLE prod_catalog.gold.daily_active_users AS
    SELECT
        DATE(parsed.ts) AS activity_date,
        COUNT(DISTINCT parsed.user_id) AS dau,
        COUNT(*) AS total_events
    FROM prod_catalog.silver.user_events
    GROUP BY DATE(parsed.ts)
""")

Databricks Unity Catalog setup

-- 1. Hierarchy: metastore > catalog > schema > table
CREATE CATALOG IF NOT EXISTS prod_catalog;
CREATE SCHEMA IF NOT EXISTS prod_catalog.bronze;
CREATE SCHEMA IF NOT EXISTS prod_catalog.silver;
CREATE SCHEMA IF NOT EXISTS prod_catalog.gold;

-- 2. External location (maps S3 path to a storage credential)
CREATE EXTERNAL LOCATION IF NOT EXISTS prod_lakehouse
    URL 's3://prod-lakehouse/'
    WITH (STORAGE CREDENTIAL aws_prod_credential);

-- 3. Managed tables (Databricks controls the file layout)
CREATE TABLE prod_catalog.silver.orders (
    order_id   BIGINT,
    customer_id BIGINT,
    amount     DECIMAL(12,2),
    order_date DATE
)
USING DELTA
CLUSTER BY (customer_id);  -- Liquid clustering (Delta 3.0+), replaces PARTITIONED BY + Z-ORDER

-- 4. Access control
GRANT USE CATALOG ON CATALOG prod_catalog TO `data-team`;
GRANT USE SCHEMA  ON SCHEMA  prod_catalog.gold TO `analysts`;
GRANT SELECT      ON TABLE   prod_catalog.gold.daily_active_users TO `analysts`;

-- 5. Enable Iceberg reads via UniForm (so Trino/Athena can read this Delta table)
ALTER TABLE prod_catalog.silver.orders
SET TBLPROPERTIES ('delta.universalFormat.enabledFormats' = 'iceberg');

Databricks table maintenance (scheduled via Workflows)

-- Run nightly on high-write tables
OPTIMIZE prod_catalog.silver.orders;

-- Clean up old files (default retention = 7 days)
VACUUM prod_catalog.silver.orders;

-- Check table health
DESCRIBE DETAIL prod_catalog.silver.orders;
-- Shows: numFiles, sizeInBytes, numPartitions, clusteringColumns
Databricks + Iceberg
If your organization needs Iceberg (for multi-engine access beyond Databricks), you have two paths: (1) Write Delta with UniForm enabled — Databricks auto-generates Iceberg metadata alongside Delta, making the same table readable by both Delta and Iceberg clients. (2) Use Iceberg directly on Databricks — supported since Databricks Runtime 13.3 LTS via USING iceberg. Unity Catalog can manage Iceberg tables natively. However, some Delta-specific features (liquid clustering, deletion vectors) are only available on Delta tables.

AWS-Native + Iceberg (No Databricks)

Iceberg has the broadest native support across AWS services. Athena, EMR, Glue ETL, and Redshift Spectrum all read/write Iceberg tables registered in the Glue Data Catalog.

Architecture diagram

┌─────────────────────────────────────────────────────────────────────┐
│                         AWS Account                                 │
│                                                                     │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────────┐  │
│  │   Airflow     │───▶│  EMR Spark   │───▶│   S3 (Iceberg data) │  │
│  │   (MWAA)      │    │  (ETL jobs)  │    │   s3://prod/tables/  │  │
│  └──────────────┘    └──────────────┘    └──────────────────────┘  │
│         │                    │                       ▲              │
│         │                    ▼                       │              │
│         │            ┌──────────────┐                │              │
│         │            │  Glue Data   │                │              │
│         │            │  Catalog     │◀───────────────┘              │
│         │            └──────────────┘   (metadata)                  │
│         │                    ▲                                      │
│         │                    │                                      │
│         ▼                    │                                      │
│  ┌──────────────┐    ┌──────────────┐                              │
│  │  Glue ETL    │    │   Athena     │   ← Serverless SQL (analysts)│
│  │  (crawlers)  │    │   (BI/SQL)   │                              │
│  └──────────────┘    └──────────────┘                              │
│                                                                     │
│  ┌──────────────────────────────────────┐                          │
│  │  Lake Formation (governance, RBAC)   │                          │
│  └──────────────────────────────────────┘                          │
└─────────────────────────────────────────────────────────────────────┘

Step 1 — S3 bucket layout

# Recommended bucket structure
s3://prod-lakehouse/
├── tables/
│   ├── bronze/
│   │   └── user_events/          # Iceberg table (data + metadata)
│   ├── silver/
│   │   └── user_events_clean/
│   └── gold/
│       └── daily_active_users/
├── checkpoints/                   # Spark structured streaming checkpoints
└── temp/                          # Staging area for ETL jobs

Step 2 — Create Iceberg tables via Athena (Glue Catalog)

-- Athena uses Glue Data Catalog by default — no extra catalog config needed
CREATE DATABASE IF NOT EXISTS bronze;
CREATE DATABASE IF NOT EXISTS silver;
CREATE DATABASE IF NOT EXISTS gold;

-- Iceberg table in Athena (V3 engine)
CREATE TABLE bronze.user_events (
    event_id   BIGINT,
    user_id    STRING,
    event_type STRING,
    properties MAP<STRING, STRING>,
    event_ts   TIMESTAMP
)
PARTITIONED BY (days(event_ts))
LOCATION 's3://prod-lakehouse/tables/bronze/user_events/'
TBLPROPERTIES (
    'table_type'    = 'ICEBERG',
    'format'        = 'PARQUET',
    'write_compression' = 'zstd'
);

-- Insert data directly from Athena
INSERT INTO bronze.user_events
SELECT * FROM raw_events_staging;

-- Time travel query
SELECT * FROM bronze.user_events FOR TIMESTAMP AS OF
    TIMESTAMP '2024-02-01 00:00:00';

Step 3 — EMR Spark for heavy ETL

# EMR Spark job — reads/writes Iceberg via Glue Catalog
# Submit with: aws emr add-steps --cluster-id j-XXXXX --steps file://step.json

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("silver-etl")
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    # Glue Catalog as the Iceberg catalog
    .config("spark.sql.catalog.glue", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue.catalog-impl",
            "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue.warehouse",
            "s3://prod-lakehouse/tables/")
    .config("spark.sql.catalog.glue.io-impl",
            "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)

# Bronze → Silver: clean and deduplicate
spark.sql("""
    CREATE OR REPLACE TABLE glue.silver.user_events_clean
    USING iceberg
    PARTITIONED BY (days(event_ts))
    AS
    SELECT
        event_id, user_id, event_type, properties, event_ts
    FROM glue.bronze.user_events
    WHERE user_id IS NOT NULL
      AND event_ts >= DATE '2024-01-01'
    QUALIFY ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_ts) = 1
""")

# MERGE for incremental upsert (Iceberg supports full MERGE)
spark.sql("""
    MERGE INTO glue.silver.user_events_clean AS target
    USING (SELECT * FROM glue.bronze.user_events WHERE event_ts >= current_date - 1) AS source
    ON target.event_id = source.event_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

# Table maintenance
spark.sql("CALL glue.system.rewrite_data_files(table => 'silver.user_events_clean')")
spark.sql("CALL glue.system.expire_snapshots(table => 'silver.user_events_clean', older_than => TIMESTAMP '2024-02-01 00:00:00')")
spark.sql("CALL glue.system.remove_orphan_files(table => 'silver.user_events_clean')")

Step 4 — Glue ETL (serverless Spark, no cluster management)

# Glue 4.0+ supports Iceberg natively
# glue_job.py — deployed via aws glue create-job

import sys
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Glue auto-configures the Glue Catalog — tables registered in Glue are available directly
# Set Iceberg configs for write support
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl",
               "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.warehouse",
               "s3://prod-lakehouse/tables/")

# Gold layer aggregation
spark.sql("""
    CREATE OR REPLACE TABLE glue_catalog.gold.daily_active_users
    USING iceberg
    AS
    SELECT
        DATE(event_ts) AS activity_date,
        COUNT(DISTINCT user_id) AS dau,
        COUNT(*) AS total_events
    FROM glue_catalog.silver.user_events_clean
    GROUP BY DATE(event_ts)
""")

Step 5 — Lake Formation governance

# Grant analysts read access to gold tables via AWS CLI
aws lakeformation grant-permissions \
    --principal '{"DataLakePrincipalIdentifier": "arn:aws:iam::123456789:role/analysts"}' \
    --resource '{"Table": {"DatabaseName": "gold", "Name": "daily_active_users"}}' \
    --permissions '["SELECT"]'

# Column-level security — hide PII columns from analysts
aws lakeformation grant-permissions \
    --principal '{"DataLakePrincipalIdentifier": "arn:aws:iam::123456789:role/analysts"}' \
    --resource '{
        "TableWithColumns": {
            "DatabaseName": "silver",
            "Name": "user_events_clean",
            "ColumnNames": ["event_id", "event_type", "event_ts"]
        }
    }' \
    --permissions '["SELECT"]'
# user_id and properties columns are NOT listed → analysts cannot see them

AWS-Native + Delta Lake (No Databricks)

Delta works on AWS without Databricks, but with less native integration than Iceberg. You provide the Delta JARs yourself on EMR/Glue.

# EMR Spark with Delta (no Databricks)
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

spark = (
    SparkSession.builder
    .appName("delta-emr")
    .config("spark.sql.extensions",
            "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # Glue Catalog as Hive Metastore replacement
    .config("spark.hadoop.hive.metastore.client.factory.class",
            "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
    .enableHiveSupport()
    .getOrCreate()
)

# Delta tables registered in Glue Catalog
spark.sql("""
    CREATE TABLE IF NOT EXISTS bronze.orders (
        order_id   BIGINT,
        customer_id BIGINT,
        amount     DECIMAL(12,2),
        order_date DATE
    )
    USING DELTA
    PARTITIONED BY (order_date)
    LOCATION 's3://prod-lakehouse/tables/bronze/orders/'
""")

# MERGE, OPTIMIZE, VACUUM all work
spark.sql("""
    OPTIMIZE bronze.orders WHERE order_date >= current_date - 7
    ZORDER BY (customer_id)
""")
spark.sql("VACUUM bronze.orders RETAIN 168 HOURS")
Delta on AWS: catalog limitations
Delta registers in Glue Catalog as a Hive-format external table with INPUTFORMAT = 'SymlinkTextInputFormat' (via manifest generation) or as a standard Delta table if using EMR 6.x+. Athena can read Delta tables only via symlink manifests — you must run GENERATE symlink_format_manifest FOR TABLE ... after each write to keep Athena in sync. This is a notable operational overhead compared to Iceberg, where Athena reads natively without manifest generation.
-- Generate manifest so Athena can read the Delta table
-- Must be re-run after every write!
GENERATE symlink_format_manifest FOR TABLE bronze.orders;

-- Then in Athena, create a table pointing to the manifest
CREATE EXTERNAL TABLE bronze.orders_athena (
    order_id   BIGINT,
    customer_id BIGINT,
    amount     DECIMAL(12,2)
)
PARTITIONED BY (order_date DATE)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
         OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://prod-lakehouse/tables/bronze/orders/_symlink_format_manifest/';

Catalog Decision Guide

ScenarioRecommended CatalogWhy
Databricks shop, Delta tables Unity Catalog Tightest integration, RBAC + lineage built-in, liquid clustering support
AWS-native, Iceberg tables AWS Glue Data Catalog Zero infrastructure, Athena/EMR/Glue/Redshift all read it natively
Multi-cloud or vendor-neutral, Iceberg Nessie or Apache Polaris Open-source REST catalog, git-like branching (Nessie), works with Spark/Trino/Flink
AWS-native, Delta tables Glue Catalog (Hive mode) Works but limited — no native Athena read (need manifests), no Delta-specific governance
Need both Delta and Iceberg Unity Catalog + UniForm Write Delta (for Databricks features), expose as Iceberg for external engines
Self-hosted, budget-conscious Hive Metastore (HMS) Battle-tested, runs on a single Postgres/MySQL instance, broad engine support

Compute Decision Guide

WorkloadDatabricksAWS-Native
Heavy ETL (TBs, complex joins) Databricks Jobs (auto-scaling Spark) EMR Spark (EC2 or EKS)
Ad-hoc SQL / BI dashboards Databricks SQL Warehouse Athena (serverless, pay per query)
Light ETL (GBs, simple transforms) Databricks Jobs (small cluster) Glue ETL (serverless Spark)
Streaming (Kafka → lakehouse) Databricks Structured Streaming EMR Flink or EMR Spark Streaming
Interactive exploration Databricks Notebooks EMR Studio / SageMaker Notebooks
Low-latency serving (sub-second) Databricks SQL + caching Trino on EMR (with Hive connector cache)

Cost Comparison (Rough Ballpark)

These are approximate ranges (2025 pricing) for a mid-size workload
Assume: 10 TB of data, 50 ETL jobs/day, 100 ad-hoc queries/day, 5 data engineers.
ComponentDatabricksAWS-Native
Compute (ETL)~$3–5K/mo (Jobs DBUs)~$2–4K/mo (EMR instances)
Compute (SQL/BI)~$2–4K/mo (SQL Warehouse DBUs)~$500–1.5K/mo (Athena per-query)
Storage (S3)~$230/mo (same S3)~$230/mo
CatalogIncluded in DatabricksGlue Catalog: ~$1/mo per 100K objects
OrchestrationIncluded (Workflows)~$400–800/mo (MWAA)
Total estimate~$6–10K/mo~$3.5–7K/mo
Operational overheadLower (managed platform)Higher (more knobs, more services)
Decision shortcut
Already using or willing to pay for Databricks? → Delta + Unity Catalog. You get the most polished experience with the least operational work. Enable UniForm if external engines need to read your tables.

AWS-native, cost-sensitive, multi-engine? → Iceberg + Glue Catalog + Athena/EMR. Iceberg has first-class AWS support and avoids vendor lock-in. You'll spend more time on infrastructure but less on licensing.