Lakehouse & Table Formats Refresher
Delta Lake, Apache Iceberg, Apache Hudi — open table formats, lakehouse architecture, time travel, schema evolution, and production patterns for data engineering and MLOps
Table of Contents
1. Lakehouse Architecture
The lakehouse combines the low-cost, scalable storage of a data lake with the ACID transactions, schema enforcement, and BI performance of a data warehouse. The key insight: separate compute from storage, then add a metadata/transaction layer on top of cheap object storage (S3, GCS, ADLS).
Evolution: Lake → Warehouse → Lakehouse
| Characteristic | Data Lake | Data Warehouse | Lakehouse |
|---|---|---|---|
| Storage | Object storage (S3/GCS) | Proprietary (Redshift, BQ) | Object storage (S3/GCS) |
| Format | Raw: CSV, JSON, Parquet | Proprietary columnar | Open: Parquet + table format |
| ACID transactions | No | Yes | Yes (via table format) |
| Schema enforcement | Optional / schema-on-read | Yes | Yes |
| Time travel | No (manual snapshots) | Limited / vendor-specific | Yes (first-class) |
| Streaming + batch | Separate systems | Batch only (typically) | Unified |
| Compute engines | Spark, Hive, Presto | Single vendor | Spark, Trino, Flink, DuckDB |
| Cost | Low storage, complex ops | High (compute + storage) | Low storage, pay-per-query |
| ML workloads | Good (raw data access) | Poor (no large file export) | Excellent (versioned datasets) |
| Vendor lock-in | Low | High | Low (open formats) |
Architecture Diagram
┌──────────────────────────────────────────────────────────────────────┐
│ LAKEHOUSE ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────┤
│ INGESTION LAYER │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │
│ │ Kafka / │ │ Batch ETL │ │ CDC (Debez-│ │ API / │ │
│ │ Kinesis │ │ (Spark/ │ │ ium/Flink) │ │ Files │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └─────┬──────┘ │
│ └────────────────┴─────────────────┴───────────────┘ │
│ │ │
├────────────────────────────────────┼──────────────────────────────── │
│ STORAGE LAYER (Object Storage) │ │
│ ┌─────────────────────────────────▼──────────────────────────────┐ │
│ │ S3 / GCS / ADLS │ │
│ │ ├── bronze/ (raw, append-only Parquet) │ │
│ │ ├── silver/ (cleaned, deduplicated, enriched) │ │
│ │ └── gold/ (aggregated, business-ready) │ │
│ └────────────────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────────────┤
│ TABLE FORMAT LAYER (ACID + Metadata) │
│ ┌────────────────┐ ┌──────────────────┐ ┌───────────────────────┐│
│ │ Delta Lake │ │ Apache Iceberg │ │ Apache Hudi ││
│ │ (_delta_log/) │ │ (metadata tree) │ │ (timeline/.hoodie/) ││
│ └────────────────┘ └──────────────────┘ └───────────────────────┘│
├──────────────────────────────────────────────────────────────────────┤
│ CATALOG LAYER (Metadata Discovery) │
│ Unity Catalog │ AWS Glue │ Hive Metastore │ Nessie │ REST Catalog │
├──────────────────────────────────────────────────────────────────────┤
│ COMPUTE LAYER (Multi-Engine Access) │
│ ┌─────────┐ ┌──────────┐ ┌───────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Spark │ │ Trino │ │ Flink │ │ DuckDB │ │ Athena │ │
│ └─────────┘ └──────────┘ └───────────┘ └──────────┘ └──────────┘ │
├──────────────────────────────────────────────────────────────────────┤
│ CONSUMPTION LAYER │
│ BI Tools │ ML Training │ Feature Stores │ Real-time APIs │
└──────────────────────────────────────────────────────────────────────┘
Key Properties of a Lakehouse
- ACID transactions — concurrent writers don't corrupt data; readers see consistent snapshots
- Schema enforcement and evolution — reject bad data at write time, evolve schema without rewrites
- BI support — columnar storage with predicate pushdown, statistics, and data skipping for fast analytics
- Decoupled storage and compute — scale each independently; multiple engines read the same data
- Openness — data stored in open formats (Parquet), not locked into a vendor's proprietary system
- End-to-end streaming — same table supports both batch inserts and streaming appends
- Versioning — time travel, rollback, and audit are first-class operations
2. Open Table Formats Overview
Plain Parquet files on object storage have no concept of transactions, versioning, or schema enforcement. Open table formats solve this by adding a metadata layer that tracks which files belong to a table, what their statistics are, and the history of operations performed.
What They Solve
- ACID on object storage — S3 offers eventual consistency for listings; table formats use atomic metadata commits to ensure readers see consistent snapshots
- Time travel — every write creates a new version; you can query any historical snapshot
- Schema evolution — add/rename/drop columns without rewriting all data files
- Partition evolution — change partitioning strategy without rewriting existing data
- Concurrent writers — optimistic concurrency control prevents lost updates
- Small file compaction — metadata layer enables safe, non-blocking compaction
- Data skipping — per-file min/max statistics stored in metadata enable predicate pushdown before reading files
Delta Lake vs Iceberg vs Hudi — Feature Matrix
| Feature | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| Origin | Databricks (2019) | Netflix (2018, Apache 2020) | Uber (2017, Apache 2019) |
| Metadata format | JSON/Parquet transaction log | JSON + Avro manifest files | Avro timeline + index files |
| ACID transactions | Yes (OCC + write serialization) | Yes (snapshot isolation) | Yes (OCC) |
| Time travel | Yes (version + timestamp) | Yes (snapshot ID + timestamp) | Yes (commit timeline) |
| Schema evolution | Good (add/rename/drop) | Excellent (full evolution) | Good |
| Partition evolution | Limited (requires rewrite) | Excellent (hidden partitioning) | Limited |
| Upserts / MERGE | Yes (MERGE INTO) | Yes (MERGE INTO) | Excellent (native upsert engine) |
| Streaming writes | Yes (Structured Streaming) | Yes (Flink + Spark) | Yes (DeltaStreamer) |
| Multi-engine reads | Good (Spark, Trino, DuckDB) | Excellent (universal) | Good (Spark-primary) |
| Compaction | OPTIMIZE command | rewrite_data_files procedure | Built-in inline compaction |
| Z-ORDER / clustering | Yes (ZORDER BY) | Sort order + bin-packing | Clustering (Z-curve) |
| Row-level deletes | Yes (deletion vectors) | Yes (position/equality deletes) | Yes (MOR + index) |
| Primary ecosystem | Databricks, AWS EMR | AWS (Glue/Athena), Snowflake | Spark streaming, CDC pipelines |
| Catalog integration | Unity, Glue, Hive | REST, Nessie, Glue, Hive | Hive Metastore, Glue |
| Column stats / skipping | Yes (per-file min/max) | Yes (per-file + per-partition) | Yes (column stats index) |
| Bloom filters | Yes | Yes | Yes |
| DML support | UPDATE, DELETE, MERGE | UPDATE, DELETE, MERGE | INSERT, UPSERT, DELETE |
| Best for | Databricks shops, batch DWH | Multi-engine, long-term storage | Near-real-time CDC, upserts |
3. Delta Lake Deep Dive
Delta Lake stores data as Parquet files plus a transaction log (_delta_log/) in the same directory. Every write operation appends a JSON entry to the log. Every 10 commits, Delta checkpoints the log state into a Parquet file to bound replay time.
Transaction Log Structure
s3://my-bucket/tables/orders/
├── _delta_log/
│ ├── 00000000000000000000.json # initial schema + first files
│ ├── 00000000000000000001.json # add files from second write
│ ├── 00000000000000000002.json # remove files (DELETE/UPDATE)
│ ├── ...
│ ├── 00000000000000000009.json
│ ├── 00000000000000000010.checkpoint.parquet # checkpoint at v10
│ ├── 00000000000000000010.json
│ └── _last_checkpoint # points to latest checkpoint
├── part-00000-a1b2c3d4.snappy.parquet
├── part-00001-e5f6a7b8.snappy.parquet
└── ...
Log Entry Anatomy (JSON)
// A single commit JSON entry (00000000000000000001.json)
{
"commitInfo": {
"timestamp": 1707840000000,
"operation": "WRITE",
"operationParameters": { "mode": "Append", "partitionBy": "[\"date\"]" },
"readVersion": 0,
"isolationLevel": "Serializable"
}
}
{
"add": {
"path": "date=2024-02-14/part-00000-abc123.snappy.parquet",
"partitionValues": { "date": "2024-02-14" },
"size": 1048576,
"modificationTime": 1707840001000,
"dataChange": true,
"stats": "{\"numRecords\":50000,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":50000},\"nullCount\":{\"id\":0}}"
}
}
{
"remove": {
"path": "date=2024-02-13/part-00000-old123.snappy.parquet",
"deletionTimestamp": 1707840001000,
"dataChange": true
}
}
ACID Guarantees in Delta Lake
- Atomicity — a write either fully commits (log entry added) or nothing changes; partial writes leave orphan files that get cleaned up by VACUUM
- Consistency — schema validation runs before the commit; bad data is rejected
- Isolation — optimistic concurrency control; concurrent writers detect conflicts via log version checks; configurable isolation levels (Serializable, WriteSerializable)
- Durability — once the log entry is written to object storage, the transaction is durable
PySpark + Delta Lake Examples
Setup
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("delta-demo")
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
Creating and Writing a Delta Table
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp
# Write as Delta (overwrite mode creates table if not exists)
(
df.write
.format("delta")
.mode("overwrite")
.partitionBy("event_date")
.save("s3://my-bucket/tables/events")
)
# Or using SQL DDL
spark.sql("""
CREATE TABLE IF NOT EXISTS events (
event_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
event_type STRING NOT NULL,
payload STRING,
event_date DATE NOT NULL
)
USING DELTA
PARTITIONED BY (event_date)
LOCATION 's3://my-bucket/tables/events'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
Reading a Delta Table
# Current version (default)
df = spark.read.format("delta").load("s3://my-bucket/tables/events")
# Time travel by version
df_v5 = (
spark.read
.format("delta")
.option("versionAsOf", 5)
.load("s3://my-bucket/tables/events")
)
# Time travel by timestamp
df_yesterday = (
spark.read
.format("delta")
.option("timestampAsOf", "2024-02-13 00:00:00")
.load("s3://my-bucket/tables/events")
)
# Via catalog
df = spark.table("my_catalog.my_schema.events")
DML: UPDATE, DELETE, MERGE
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "s3://my-bucket/tables/events")
# UPDATE
dt.update(
condition="event_type = 'click'",
set={"event_type": "'CLICK'"}
)
# DELETE
dt.delete(condition="event_date < '2023-01-01'")
# MERGE (upsert)
(
dt.alias("target")
.merge(
source=updates_df.alias("source"),
condition="target.event_id = source.event_id"
)
.whenMatchedUpdate(set={
"event_type": "source.event_type",
"payload": "source.payload"
})
.whenNotMatchedInsertAll()
.execute()
)
Delta SQL
-- Describe history
DESCRIBE HISTORY events;
-- Time travel query
SELECT * FROM events VERSION AS OF 5;
SELECT * FROM events TIMESTAMP AS OF '2024-02-13';
-- Optimize and Z-ORDER
OPTIMIZE events ZORDER BY (user_id, event_type);
-- Vacuum (remove files older than retention threshold)
VACUUM events RETAIN 168 HOURS; -- 7 days
-- Table properties
ALTER TABLE events SET TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 30 days',
'delta.deletedFileRetentionDuration' = 'interval 7 days'
);
-- Restore to a previous version
RESTORE TABLE events TO VERSION AS OF 10;
RESTORE TABLE events TO TIMESTAMP AS OF '2024-02-01';
Delta Lake with Structured Streaming
# Write stream to Delta table
(
streaming_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://my-bucket/checkpoints/events")
.trigger(processingTime="1 minute") # micro-batch every minute
# .trigger(availableNow=True) # one-shot batch mode
.start("s3://my-bucket/tables/events")
)
# Read stream from Delta table (change data feed)
(
spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.load("s3://my-bucket/tables/events")
.writeStream
.format("console")
.start()
)
ALTER TABLE events SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true'). CDF records row-level changes (insert/update_preimage/update_postimage/delete) that downstream tables can consume incrementally — ideal for building silver-layer CDC pipelines without full re-scans.
4. Apache Iceberg Deep Dive
Iceberg's distinguishing feature is its metadata tree: a catalog points to a metadata file, which points to a manifest list, which points to manifest files, which point to actual data files. This indirection enables atomic swaps, hidden partitioning, and full partition evolution without touching data files.
Metadata Tree Structure
CATALOG (e.g., Glue, Nessie, REST)
└── table pointer: s3://bucket/warehouse/db/orders/metadata/v3.metadata.json
METADATA FILE (v3.metadata.json)
├── table-uuid, format-version: 2
├── current-snapshot-id: 8342957621
├── partition-spec: [{ field: "order_date", transform: "day" }]
├── sort-order: [{ field: "customer_id", direction: "asc" }]
└── snapshots:
├── snapshot-id: 8342957620 (previous)
└── snapshot-id: 8342957621 (current)
└── manifest-list: s3://.../snap-8342957621-0.avro
MANIFEST LIST (snap-8342957621-0.avro)
├── manifest-file: s3://.../1234abcd-m0.avro (added files)
└── manifest-file: s3://.../5678efgh-m1.avro (existing files)
MANIFEST FILE (1234abcd-m0.avro)
├── data-file: s3://.../00000-5-abc.parquet
│ ├── partition: { order_date: "2024-02-14" }
│ ├── record_count: 75000
│ ├── file_size_in_bytes: 8388608
│ └── column_stats: [{ lower_bound, upper_bound, null_count }]
└── data-file: s3://.../00001-5-def.parquet
└── ...
Snapshot Isolation
- Each write creates a new snapshot with a unique ID and timestamp
- Readers always read from a consistent snapshot — they are never affected by concurrent writes
- Snapshot expiration (garbage collection) is explicit via
expire_snapshots - Two snapshot operations: append (add new files), overwrite (replace files), replace (compaction/rewrite)
Hidden Partitioning
Iceberg decouples the physical partition layout from the logical query interface. You define partition transforms on columns, and Iceberg automatically prunes partitions without requiring users to filter on partition columns explicitly.
-- Define table with hidden partitioning
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_ts TIMESTAMP,
total DECIMAL(10,2),
status STRING
)
USING iceberg
PARTITIONED BY (days(order_ts), truncate(customer_id, 10))
-- days(order_ts): partition by day derived from timestamp
-- truncate(customer_id, 10): partition by customer_id / 10
-- Users query on order_ts directly; Iceberg prunes partitions automatically
;
-- Supported transforms:
-- years(ts), months(ts), days(ts), hours(ts)
-- bucket(N, col) -- hash bucket
-- truncate(N, col) -- first N chars / integer truncation
-- identity(col) -- classic partition column (not recommended for high cardinality)
PySpark + Iceberg Examples
Setup (Spark + Iceberg)
spark = (
SparkSession.builder
.appName("iceberg-demo")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.my_catalog",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.my_catalog.type", "rest")
.config("spark.sql.catalog.my_catalog.uri", "http://nessie:19120/api/v1")
# Or for Glue:
# .config("spark.sql.catalog.my_catalog.type", "glue")
# .config("spark.sql.catalog.my_catalog.warehouse", "s3://my-bucket/warehouse")
.getOrCreate()
)
Creating and Writing
# Create table via SQL
spark.sql("""
CREATE TABLE my_catalog.db.orders (
order_id BIGINT NOT NULL,
customer_id BIGINT NOT NULL,
order_ts TIMESTAMP NOT NULL,
total DECIMAL(12,2),
status STRING
)
USING iceberg
PARTITIONED BY (days(order_ts))
LOCATION 's3://my-bucket/warehouse/db/orders'
TBLPROPERTIES (
'write.parquet.compression-codec' = 'zstd',
'write.metadata.compression-codec' = 'gzip',
'write.target-file-size-bytes' = '134217728' -- 128 MB
)
""")
# Write via DataFrame API
(
df.writeTo("my_catalog.db.orders")
.append()
)
# Overwrite by partition (dynamic overwrite)
(
df.writeTo("my_catalog.db.orders")
.overwritePartitions()
)
Time Travel and Snapshot Operations
# Query by snapshot ID
spark.read.option("snapshot-id", "8342957621") \
.table("my_catalog.db.orders")
# Query by timestamp
spark.read.option("as-of-timestamp", "1707840000000") \
.table("my_catalog.db.orders")
# Via SQL
spark.sql("SELECT * FROM my_catalog.db.orders FOR SYSTEM_VERSION AS OF 8342957621")
spark.sql("SELECT * FROM my_catalog.db.orders FOR SYSTEM_TIME AS OF '2024-02-14 12:00:00'")
# List snapshots
spark.sql("SELECT * FROM my_catalog.db.orders.snapshots").show()
# List history
spark.sql("SELECT * FROM my_catalog.db.orders.history").show()
# List manifests
spark.sql("SELECT * FROM my_catalog.db.orders.manifests").show()
# List data files (with stats)
spark.sql("SELECT file_path, file_size_in_bytes, record_count FROM my_catalog.db.orders.files").show()
Iceberg Procedures
# Expire old snapshots
spark.sql("""
CALL my_catalog.system.expire_snapshots(
table => 'db.orders',
older_than => TIMESTAMP '2024-01-01 00:00:00',
retain_last => 5
)
""")
# Compact small files (bin-pack)
spark.sql("""
CALL my_catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'binpack',
options => map('target-file-size-bytes', '134217728',
'min-file-size-bytes', '67108864',
'max-file-size-bytes', '536870912')
)
""")
# Rewrite with sort order (Z-order equivalent)
spark.sql("""
CALL my_catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'sort',
sort_order => 'customer_id ASC NULLS LAST, order_ts ASC'
)
""")
# Remove orphan files
spark.sql("""
CALL my_catalog.system.remove_orphan_files(
table => 'db.orders',
older_than => TIMESTAMP '2024-02-01 00:00:00'
)
""")
# Rollback to snapshot
spark.sql("""
CALL my_catalog.system.rollback_to_snapshot(
table => 'db.orders',
snapshot_id => 8342957620
)
""")
Iceberg MERGE INTO
-- Upsert with MERGE INTO (Iceberg)
MERGE INTO my_catalog.db.orders AS target
USING updates AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.status = 'CANCELLED' THEN DELETE
WHEN MATCHED THEN UPDATE SET
target.status = source.status,
target.total = source.total
WHEN NOT MATCHED THEN INSERT *;
5. Apache Hudi Overview
Hudi (Hadoop Upserts Deletes and Incrementals) was built at Uber specifically for near-real-time upsert-heavy workloads: CDC ingestion from databases, event deduplication, and incremental processing pipelines. Its unique contributions are a built-in index for fast upsert lookups and two storage types that trade read vs write amplification.
Copy-on-Write vs Merge-on-Read
| Aspect | Copy-on-Write (CoW) | Merge-on-Read (MoR) |
|---|---|---|
| Write behavior | Rewrite entire Parquet file on update | Append delta (log) files; base file untouched |
| Read behavior | Read base Parquet only (fast) | Merge base + log files on read (slower) |
| Write amplification | High (rewrites files) | Low (appends only) |
| Read amplification | None | Present until compaction |
| Compaction | Not needed (implicit) | Required (async or inline) |
| Storage overhead | None | Extra log files until compaction |
| Snapshot query | Latest base files | Latest base + log files |
| Read-optimized query | Same as snapshot | Latest compacted base only (stale by Δt) |
| Best for | Read-heavy, low-update-frequency tables | High-frequency upsert workloads, CDC |
Hudi Timeline
Hudi maintains a timeline of all table actions in a .hoodie/ directory. Each action has three states: requested (intent), inflight (in-progress), completed (committed).
s3://bucket/tables/orders/
├── .hoodie/
│ ├── 20240214120000000.commit # completed commit (MoR CoW both)
│ ├── 20240214120000000.commit.requested # pre-commit request
│ ├── 20240214120500000.deltacommit # MoR delta commit
│ ├── 20240214121000000.compaction.requested
│ ├── 20240214121000000.compaction.inflight
│ ├── 20240214121000000.commit # compaction completed
│ └── hoodie.properties # table metadata
├── 20240214/ # partition
│ ├── xxx-base.parquet # base file
│ └── xxx.log.1_1-0-1.parquet # MoR delta log file
└── ...
Hudi PySpark Examples
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("hudi-demo")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.getOrCreate()
)
# --- Upsert into a CoW table ---
hudi_options = {
"hoodie.table.name": "orders",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.partitionpath.field": "order_date",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "updated_at",
# For MoR, change table.type to MERGE_ON_READ
}
(
updates_df.write
.format("hudi")
.options(**hudi_options)
.mode("append")
.save("s3://my-bucket/tables/orders")
)
# --- Incremental read (only changed records since a commit time) ---
(
spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", "20240214120000000")
.load("s3://my-bucket/tables/orders")
.show()
)
# --- Read-optimized query (MoR, base files only) ---
(
spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "read_optimized")
.load("s3://my-bucket/tables/orders")
)
Hudi Compaction
# Inline compaction (every N delta commits)
hudi_options_mor = {
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.compact.inline": "true",
"hoodie.compact.inline.max.delta.commits": "5", # compact every 5 commits
"hoodie.parquet.max.file.size": str(128 * 1024 * 1024), # 128 MB
}
# Async compaction via HoodieSparkCompactor (separate Spark job)
# spark-submit ... --class org.apache.hudi.utilities.HoodieCompactor \
# --base-path s3://my-bucket/tables/orders \
# --table-name orders \
# --schema-file /path/to/schema.avsc
precombine.field is used to resolve conflicts when multiple records share the same key within one batch — Hudi keeps the record with the highest precombine value. Typically updated_at (timestamp) or a monotonically increasing sequence number. Missing or wrong precombine causes silent data loss on upserts.
6. Schema Evolution
Schema evolution lets you modify a table's structure without rewriting existing data. The table format metadata tracks the current schema and its history, so readers can handle both old and new files correctly.
Operations: Safe vs Unsafe
| Operation | Delta Lake | Iceberg | Hudi | Risk |
|---|---|---|---|---|
| Add column | Yes | Yes | Yes | Safe (nulls for old files) |
| Drop column | Yes (explicit) | Yes | Limited | Data loss if column referenced |
| Rename column | Yes (column mapping) | Yes (by column ID) | Limited | Safe in Iceberg (ID-based); risky in Delta without mapping enabled |
| Reorder columns | No | Yes | No | Safe in Iceberg |
| Widen type (int→long) | Yes | Yes | Limited | Safe (no data loss) |
| Narrow type (long→int) | No | No | No | Unsafe (potential overflow) |
| Change nullability | Partial | Yes | Limited | Depends on direction |
| Add struct field | Yes | Yes | No | Safe |
| Change struct field type | Limited | Yes | No | Careful with widening only |
Schema Evolution in Delta Lake
# Option 1: mergeSchema on write (auto-evolve from new DataFrame)
(
new_df_with_extra_col.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") # adds new columns from new_df
.save("s3://my-bucket/tables/events")
)
# Option 2: ALTER TABLE statements
spark.sql("ALTER TABLE events ADD COLUMNS (session_id STRING AFTER user_id)")
spark.sql("ALTER TABLE events ADD COLUMNS (geo STRUCT<lat: DOUBLE, lon: DOUBLE>)")
spark.sql("ALTER TABLE events DROP COLUMN deprecated_field")
# Rename requires column mapping mode (safe rename)
spark.sql("""
ALTER TABLE events
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')
""")
spark.sql("ALTER TABLE events RENAME COLUMN event_ts TO occurred_at")
# Widen a type
spark.sql("ALTER TABLE events ALTER COLUMN user_id TYPE BIGINT")
# Schema enforcement: reject writes with incompatible schema
try:
(
bad_df.write
.format("delta")
.mode("append")
.save("s3://my-bucket/tables/events")
# Raises AnalysisException if schema doesn't match
# and mergeSchema is not set
)
except Exception as e:
print(f"Schema mismatch rejected: {e}")
Schema Evolution in Iceberg
-- Iceberg tracks columns by ID (not name), enabling safe renames
ALTER TABLE my_catalog.db.orders ADD COLUMN region STRING AFTER status;
ALTER TABLE my_catalog.db.orders DROP COLUMN deprecated_region;
ALTER TABLE my_catalog.db.orders RENAME COLUMN total TO order_total;
ALTER TABLE my_catalog.db.orders ALTER COLUMN order_id TYPE BIGINT;
-- Reorder columns
ALTER TABLE my_catalog.db.orders ALTER COLUMN status FIRST;
ALTER TABLE my_catalog.db.orders ALTER COLUMN region AFTER status;
-- Add nested struct field
ALTER TABLE my_catalog.db.orders ADD COLUMN address STRUCT<street:STRING, zip:STRING, city:STRING>;
ALTER TABLE my_catalog.db.orders ADD COLUMN address.country STRING;
Schema Enforcement vs Evolution
# Delta Lake: set constraints for enforcement
spark.sql("""
ALTER TABLE orders ADD CONSTRAINT order_total_positive
CHECK (total > 0)
""")
# Iceberg: NOT NULL constraints
spark.sql("""
CREATE TABLE orders (
order_id BIGINT NOT NULL,
total DECIMAL(12,2) NOT NULL
) USING iceberg
""")
# ALTER TABLE orders ALTER COLUMN customer_id DROP NOT NULL
# Test schema enforcement
from pyspark.sql import functions as F
bad_records = (
df.filter(F.col("total") <= 0)
.count()
)
assert bad_records == 0, f"Found {bad_records} negative totals"
7. Partition Evolution
Partition strategy decisions made at table creation time often become wrong as data volumes and query patterns change. Table formats differ dramatically in how well they support changing partitioning without data rewrites.
Iceberg: Partition Evolution (Best-in-Class)
Iceberg stores partition metadata per manifest, so old files retain their original partition spec while new files use the updated spec. Queries work transparently across both partition specs — Iceberg figures out which files to read for each spec.
-- Initial table: partitioned monthly (too coarse at scale)
CREATE TABLE events (
event_id BIGINT,
user_id BIGINT,
event_ts TIMESTAMP,
payload STRING
)
USING iceberg
PARTITIONED BY (months(event_ts));
-- 2 years later, monthly partitions are too large.
-- Evolve to daily partitioning WITHOUT rewriting any data:
ALTER TABLE events REPLACE PARTITION FIELD months(event_ts) WITH days(event_ts);
-- Queries spanning the transition work correctly:
SELECT COUNT(*) FROM events
WHERE event_ts BETWEEN '2023-01-01' AND '2024-12-31';
-- Iceberg reads monthly-partitioned files for 2023, daily-partitioned for 2024
-- Add a second partition field (compound partitioning)
ALTER TABLE events ADD PARTITION FIELD bucket(16, user_id);
Delta Lake: Partition Strategy
Delta Lake does not support partition evolution — changing the partition columns requires rewriting the entire table. For this reason, choose Delta partition columns conservatively: prefer low-cardinality date columns and use Z-ORDER for high-cardinality filtering needs.
# Delta: changing partition requires a full rewrite
# This is the correct approach: CTAS with new partition
spark.sql("""
CREATE OR REPLACE TABLE events_repartitioned
USING DELTA
PARTITIONED BY (event_date) -- was PARTITIONED BY (event_month)
AS SELECT * FROM events
""")
# Then swap (atomic rename is not native in Delta;
# typically done by renaming at catalog level)
spark.sql("ALTER TABLE events RENAME TO events_old")
spark.sql("ALTER TABLE events_repartitioned RENAME TO events")
# Mitigation: use ZORDER instead of fine-grained partitioning
spark.sql("""
OPTIMIZE events ZORDER BY (user_id, event_type)
-- Data clustering for high-cardinality cols without partition overhead
""")
Partition Sizing Guidance
| Scenario | Recommendation |
|---|---|
| Partition too small (< 1 GB) | Too many files; poor read performance. Coarsen partition grain (daily → weekly → monthly). |
| Partition too large (> 200 GB) | Can't prune efficiently; individual partition queries are slow. Fine-grain or add compound partition. |
| Target partition size | 1–100 GB per partition (depends on engine; Spark works well at 10–50 GB per partition per core) |
| High-cardinality column filtering | Use Z-ORDER (Delta) or sort order + bin-pack (Iceberg) instead of partition |
| Iceberg: hidden vs identity | Prefer transforms (days/months/bucket) over identity — identity on a timestamp column creates a partition per unique value |
| Delta: partition column choice | Date/region work well; avoid user_id or event_type (too many partitions, small files per partition) |
autoOptimize.autoCompact table property.
8. Time Travel & Versioning
Time travel lets you query historical data as it existed at any prior snapshot. Use cases: auditing, debugging pipelines, reproducing ML training sets, A/B test analysis, and rollback after bad writes.
Delta Lake Time Travel
-- List full transaction history
DESCRIBE HISTORY orders;
-- Returns: version, timestamp, userId, operation, operationParameters,
-- operationMetrics (numAdded, numRemoved, numOutputRows)
-- Query by version number
SELECT COUNT(*) FROM orders VERSION AS OF 42;
-- Query by timestamp
SELECT * FROM orders TIMESTAMP AS OF '2024-02-01 00:00:00';
-- In Python
df = spark.read.format("delta").option("versionAsOf", 42).load(path)
df = spark.read.format("delta").option("timestampAsOf", "2024-02-01").load(path)
-- Restore table to previous state (destructive: creates a new version)
RESTORE TABLE orders TO VERSION AS OF 42;
RESTORE TABLE orders TO TIMESTAMP AS OF '2024-02-01 00:00:00';
Iceberg Time Travel
-- Query specific snapshot
SELECT * FROM orders FOR SYSTEM_VERSION AS OF 8342957621;
SELECT * FROM orders FOR SYSTEM_TIME AS OF '2024-02-01 12:00:00';
-- List all snapshots
SELECT snapshot_id, committed_at, operation, summary
FROM my_catalog.db.orders.snapshots
ORDER BY committed_at DESC;
-- Compare two snapshots (use incremental scan)
SELECT * FROM my_catalog.db.orders.changes
WHERE snapshot_id_inclusive = 8342957620
AND snapshot_id_exclusive = 8342957621;
-- Returns: _change_type (INSERT/DELETE/UPDATE_BEFORE/UPDATE_AFTER)
# Rollback to a previous snapshot (metadata-only operation)
spark.sql("""
CALL my_catalog.system.rollback_to_snapshot(
table => 'db.orders',
snapshot_id => 8342957620
)
""")
# Rollback to timestamp
spark.sql("""
CALL my_catalog.system.rollback_to_timestamp(
table => 'db.orders',
timestamp => TIMESTAMP '2024-02-01 00:00:00'
)
""")
Retention Policies
Time travel requires keeping old data and log files. Retention is a trade-off: longer retention enables more historical queries but increases storage cost.
-- Delta Lake: set retention periods
ALTER TABLE orders SET TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 30 days', -- keep log entries
'delta.deletedFileRetentionDuration' = 'interval 7 days' -- keep data files after removal
);
-- Clean up files older than retention (VACUUM)
-- Default threshold: 7 days. Override only with caution.
VACUUM orders RETAIN 168 HOURS; -- 7 days (safe minimum)
VACUUM orders RETAIN 0 HOURS; -- DANGER: breaks time travel, ongoing streams
-- Iceberg: expire snapshots explicitly
CALL my_catalog.system.expire_snapshots(
table => 'db.orders',
older_than => TIMESTAMP '2024-01-01 00:00:00',
retain_last => 10 -- always keep at least 10 snapshots regardless of age
);
-- expire_snapshots also removes orphan manifest files
-- Data files not referenced by any retained snapshot are marked for deletion
-- but actual file removal requires remove_orphan_files
Audit Pattern
# Track who changed what and when (Delta)
history_df = spark.sql("""
SELECT version,
timestamp,
operationParameters.mode AS write_mode,
operationMetrics.numOutputRows AS rows_written,
userMetadata
FROM (DESCRIBE HISTORY orders)
WHERE operation IN ('WRITE', 'MERGE', 'UPDATE', 'DELETE')
ORDER BY version DESC
""")
# Diff two versions: what changed between v10 and v15?
v10 = spark.read.format("delta").option("versionAsOf", 10).load(path)
v15 = spark.read.format("delta").option("versionAsOf", 15).load(path)
added = v15.exceptAll(v10) # rows in v15 not in v10
removed = v10.exceptAll(v15) # rows in v10 not in v15
9. MERGE / Upsert Patterns
MERGE INTO is the Swiss Army knife for incremental data engineering: CDC ingestion, SCD updates, deduplication, and late-arriving data correction all reduce to merge variants.
Basic Upsert (Insert or Update)
-- Delta / Iceberg syntax is nearly identical
MERGE INTO target AS t
USING source AS s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET t.value = s.value,
t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT (id, value, updated_at)
VALUES (s.id, s.value, s.updated_at);
CDC Ingestion Pattern
Change Data Capture (CDC) from a source database typically produces a stream of inserts, updates, and deletes with an operation type column (op: I/U/D).
from delta.tables import DeltaTable
from pyspark.sql import functions as F
# CDC stream from Kafka / Debezium
# Schema: id, op (I/U/D), payload (JSON with before/after), ts
cdc_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "db.public.orders.cdc")
.load()
.select(F.from_json(F.col("value").cast("string"), cdc_schema).alias("r"))
.select("r.*")
)
def process_cdc_batch(batch_df, batch_id):
dt = DeltaTable.forName(spark, "warehouse.orders")
# Dedup within batch: keep latest record per key
latest = (
batch_df
.withColumn("_rank", F.rank().over(
Window.partitionBy("id").orderBy(F.desc("ts"))
))
.filter(F.col("_rank") == 1)
.drop("_rank")
)
(
dt.alias("target")
.merge(latest.alias("source"), "target.id = source.id")
.whenMatchedDelete(condition="source.op = 'D'")
.whenMatchedUpdate(
condition="source.op IN ('U', 'u')",
set={
"target.value": "source.after.value",
"target.updated_at": "source.ts"
}
)
.whenNotMatchedInsert(
condition="source.op = 'I'",
values={
"id": "source.after.id",
"value": "source.after.value",
"updated_at": "source.ts"
}
)
.execute()
)
(
cdc_df.writeStream
.foreachBatch(process_cdc_batch)
.option("checkpointLocation", "s3://bucket/checkpoints/orders-cdc")
.trigger(processingTime="30 seconds")
.start()
)
SCD Type 2 with MERGE
Slowly Changing Dimension Type 2 preserves full history by closing old rows and inserting new ones rather than updating in place.
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, current_timestamp
dt = DeltaTable.forName(spark, "warehouse.dim_customer")
# updates_df: new/changed customers
# Strategy: detect changes, close old rows, insert new rows
(
dt.alias("target")
.merge(
updates_df.alias("source"),
# Match on business key AND open row
"target.customer_id = source.customer_id AND target.is_current = true"
)
# Close the existing row when data has changed
.whenMatchedUpdate(
condition="""
target.email != source.email OR
target.address != source.address OR
target.name != source.name
""",
set={
"target.is_current": lit(False),
"target.effective_end": current_timestamp()
}
)
# Insert updated row as a new current row
.whenNotMatchedInsertAll()
.execute()
)
# Insert new current rows for changed records
changed = (
updates_df.alias("source")
.join(
dt.toDF().filter("is_current = true").alias("target"),
"customer_id",
"left"
)
.filter("""
target.customer_id IS NULL OR
target.email != source.email OR
target.address != source.address
""")
.select("source.*")
.withColumn("is_current", lit(True))
.withColumn("effective_start", current_timestamp())
.withColumn("effective_end", lit(None).cast("timestamp"))
)
changed.write.format("delta").mode("append").saveAsTable("warehouse.dim_customer")
Deduplication Pattern
-- Remove duplicates from a table (keep row with highest sequence)
MERGE INTO orders AS target
USING (
SELECT id, MAX(seq) AS max_seq
FROM orders
GROUP BY id
HAVING COUNT(*) > 1
) AS dupes ON target.id = dupes.id AND target.seq < dupes.max_seq
WHEN MATCHED THEN DELETE;
-- Or via CTAS (simpler, rewrites table):
CREATE OR REPLACE TABLE orders
USING DELTA AS
SELECT * EXCEPT(_row_num)
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY seq DESC) AS _row_num
FROM orders
)
WHERE _row_num = 1;
target.event_date = source.event_date AND target.id = source.id. On large tables, a missing partition predicate turns a fast merge into a full table scan.
10. Compaction & Optimization
Object storage performs best with files in the 128 MB–1 GB range. Streaming writes produce many small files; compaction consolidates them. This is one of the most impactful ongoing maintenance tasks in a lakehouse.
Delta Lake: OPTIMIZE and Z-ORDER
-- Bin-pack: consolidate small files into target-size files (default 1 GB)
OPTIMIZE orders;
-- Z-ORDER: bin-pack + co-locate related rows by specified columns
-- Data skipping becomes highly effective when query filters match Z-ORDER cols
OPTIMIZE orders ZORDER BY (customer_id, order_date);
-- Partition-scoped optimize (recommended for large tables)
OPTIMIZE orders WHERE order_date >= '2024-02-01';
-- Auto optimize: configure at table level (Databricks Delta only)
ALTER TABLE orders SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true', -- bin-pack on every write
'delta.autoOptimize.autoCompact' = 'true' -- async compaction after writes
);
# Optimize from Python
from delta.tables import DeltaTable
dt = DeltaTable.forName(spark, "orders")
dt.optimize().executeZOrderBy("customer_id", "order_date")
# Partition-scoped
from pyspark.sql.functions import col
dt.optimize().where(col("order_date") >= "2024-02-01").executeZOrderBy("customer_id")
Iceberg: rewrite_data_files
# Bin-pack compaction
spark.sql("""
CALL my_catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'binpack',
where => 'order_date >= ''2024-01-01''',
options => map(
'target-file-size-bytes', '134217728', -- 128 MB
'min-file-size-bytes', '33554432', -- 32 MB (compact if smaller)
'max-file-size-bytes', '536870912', -- 512 MB (split if larger)
'max-concurrent-file-group-rewrites', '10'
)
)
""")
# Sort-based compaction (equivalent to Z-ORDER)
spark.sql("""
CALL my_catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'sort',
sort_order => 'zorder(customer_id, order_date)'
)
""")
# Rewrite manifests (reduce manifest file count after many small writes)
spark.sql("""
CALL my_catalog.system.rewrite_manifests('db.orders')
""")
Vacuum and Snapshot Expiry
-- Delta: remove files no longer referenced by any version within retention period
-- Default retention: 7 days
SET spark.databricks.delta.retentionDurationCheck.enabled = false; -- only for testing
VACUUM orders RETAIN 168 HOURS; -- 7 days
VACUUM orders DRY RUN; -- preview what would be deleted
-- Iceberg: two-step cleanup
-- Step 1: expire snapshots (removes snapshot metadata + unreferenced manifests)
CALL my_catalog.system.expire_snapshots(
table => 'db.orders',
older_than => TIMESTAMP '2024-01-15 00:00:00',
retain_last => 5
);
-- Step 2: remove orphan files (data files left by failed writes, etc.)
CALL my_catalog.system.remove_orphan_files(
table => 'db.orders',
older_than => TIMESTAMP '2024-01-15 00:00:00'
);
Bloom Filters
-- Delta: add bloom filter for high-cardinality equality lookups
ALTER TABLE orders SET TBLPROPERTIES (
'delta.bloomFilter.order_id.enabled' = 'true',
'delta.bloomFilter.order_id.fpp' = '0.01', -- false positive probability
'delta.bloomFilter.order_id.numItems' = '1000000'
);
-- Iceberg: bloom filter column stats
ALTER TABLE my_catalog.db.orders
SET TBLPROPERTIES (
'write.parquet.bloom-filter-enabled.column.order_id' = 'true',
'write.parquet.bloom-filter-max-bytes' = '1048576'
);
11. Catalog & Metadata
A catalog is the entry point for discovering and accessing tables. It maps logical table names to physical metadata locations. The catalog layer is where lakehouse governance, access control, and lineage live.
Catalog Options
| Catalog | Format Support | Multi-Engine | Notes |
|---|---|---|---|
| Unity Catalog (Databricks) | Delta, Iceberg (read), Parquet | Spark, DatabricksSQL | RBAC, lineage, data masking, audit logs. Requires Databricks. Iceberg support via UniForm. |
| AWS Glue Data Catalog | Delta, Iceberg, Hudi, Parquet | Spark, Athena, Trino, Redshift Spectrum | Managed, AWS-native. Best for AWS shops. |
| Hive Metastore (HMS) | All formats | Spark, Hive, Trino, Flink | Widely supported but requires managing a separate HMS service. Limited partition evolution support. |
| Project Nessie | Iceberg, Delta | Spark, Trino, Flink, Dremio | Git-like branches and tags for catalog-level version control. Open-source. Excellent for CI/CD on data. |
| Iceberg REST Catalog | Iceberg | Spark, Trino, Flink, Pyiceberg | Open standard (Iceberg spec). Tabular, Snowflake Open Catalog, Polaris implement this spec. |
| Apache Polaris (Snowflake) | Iceberg | Spark, Trino, Snowflake, Flink | Open-source REST catalog donated to Apache. Production-grade Iceberg catalog. |
Unity Catalog Configuration
# Unity Catalog hierarchy: metastore > catalog > schema > table
# Tables addressed as: catalog.schema.table
spark.sql("USE CATALOG my_prod_catalog")
spark.sql("USE SCHEMA warehouse")
spark.sql("SELECT * FROM orders LIMIT 10") # reads my_prod_catalog.warehouse.orders
# Create external table registered in Unity Catalog
spark.sql("""
CREATE TABLE IF NOT EXISTS my_prod_catalog.warehouse.orders
USING DELTA
LOCATION 's3://my-bucket/tables/orders'
-- Unity Catalog manages access; storage credential referenced by name
""")
# Grant access
spark.sql("GRANT SELECT ON TABLE my_prod_catalog.warehouse.orders TO `data-analysts`")
spark.sql("GRANT MODIFY ON TABLE my_prod_catalog.warehouse.orders TO `data-engineers`")
# Row-level security via dynamic views
spark.sql("""
CREATE OR REPLACE VIEW my_prod_catalog.warehouse.orders_filtered AS
SELECT * FROM my_prod_catalog.warehouse.orders
WHERE region = current_user_region() -- UDF returns caller's region
""")
Nessie: Git-Like Data Versioning
# Nessie treats the catalog like a git repo
# Create a feature branch for a data migration
spark.sql("CREATE BRANCH IF NOT EXISTS etl_migration IN my_nessie_catalog")
spark.sql("USE REFERENCE etl_migration IN my_nessie_catalog")
# Perform migration on the branch (does not affect main)
spark.sql("""
ALTER TABLE my_nessie_catalog.db.orders
ADD COLUMN order_source STRING
""")
spark.sql("UPDATE my_nessie_catalog.db.orders SET order_source = 'web' WHERE ...")
# Validate on branch, then merge to main
spark.sql("""
MERGE BRANCH etl_migration INTO main IN my_nessie_catalog
""")
# Tag a production snapshot for reproducibility
spark.sql("CREATE TAG prod_snapshot_2024_02 IN my_nessie_catalog")
# Future reads: spark.sql("USE REFERENCE prod_snapshot_2024_02 IN my_nessie_catalog")
PyIceberg: Python-Native Iceberg Access
from pyiceberg.catalog import load_catalog
import pyarrow as pa
# Connect to REST catalog (no Spark required)
catalog = load_catalog(
"rest",
**{
"type": "rest",
"uri": "https://my-catalog.example.com/api",
"token": "my-bearer-token",
"warehouse": "s3://my-bucket/warehouse",
}
)
# Load table
table = catalog.load_table("db.orders")
# Scan with predicate pushdown (returns PyArrow dataset)
scan = table.scan(
row_filter="order_date >= '2024-02-01' AND status = 'COMPLETED'",
selected_fields=("order_id", "customer_id", "total"),
limit=10000
)
arrow_table = scan.to_arrow()
df = arrow_table.to_pandas()
# Append data from Pandas/Arrow
new_data = pa.Table.from_pandas(new_df, schema=table.schema().as_arrow())
table.append(new_data)
# Time travel
snapshot = table.snapshot_by_id(8342957620)
old_scan = table.scan(snapshot_id=snapshot.snapshot_id)
12. Data Engineering Production Patterns
Medallion Architecture (Bronze / Silver / Gold)
The medallion architecture organizes data into layers of increasing quality and semantic richness. Each layer is a full table (not a view) so that failures at any stage are isolated.
BRONZE (Raw / Append-Only)
├── Exact copy of source data, no transformation
├── Schema-on-read or loose schema
├── Retains all history (never delete)
├── Partition by ingestion date
└── Sources: Kafka topics, database CDC, file drops, API calls
SILVER (Cleaned / Enriched)
├── Deduplicated, validated, type-cast
├── Bad records quarantined (not dropped) to quarantine table
├── Joined with reference data (dim tables)
├── Partition by business date (event_date, not ingest_date)
└── SLA: refresh within 15 minutes of Bronze
GOLD (Aggregated / Business-Facing)
├── Pre-aggregated metrics, KPIs, feature tables
├── Optimized for BI tools and ML consumption
├── Strict schema, enforced business rules
├── Partition by report date or rolling window
└── SLA: refresh within 1 hour
# Incremental Bronze → Silver pipeline (Delta)
from pyspark.sql import functions as F
from delta.tables import DeltaTable
# Read new Bronze records since last watermark
last_watermark = get_watermark("bronze_to_silver_orders") # stored in control table
bronze_new = (
spark.read.format("delta")
.table("bronze.orders_raw")
.filter(F.col("_ingest_ts") > last_watermark)
)
# Apply transformations
silver_df = (
bronze_new
.withColumn("order_id", F.col("payload.order_id").cast("bigint"))
.withColumn("total", F.col("payload.total").cast("decimal(12,2)"))
.withColumn("event_date", F.to_date("payload.order_ts"))
# Dedup within batch
.dropDuplicates(["order_id"])
# Reject null primary keys (quarantine instead of drop)
.filter(F.col("order_id").isNotNull())
)
# Upsert into Silver (idempotent)
dt = DeltaTable.forName(spark, "silver.orders")
(
dt.alias("t")
.merge(silver_df.alias("s"), "t.order_id = s.order_id AND t.event_date = s.event_date")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Advance watermark only after successful commit
update_watermark("bronze_to_silver_orders", bronze_new.agg(F.max("_ingest_ts")).collect()[0][0])
Data Quality with Great Expectations
import great_expectations as gx
context = gx.get_context()
# Define expectations on Silver orders table
batch = context.sources.add_spark(name="spark_source") \
.add_dataframe_asset(name="silver_orders") \
.add_batch_definition_whole_dataframe("batch") \
.get_batch(batch_parameters={"dataframe": silver_df})
results = batch.validate(
expectation_suite={
"expectations": [
{"type": "expect_column_values_to_not_be_null",
"kwargs": {"column": "order_id"}},
{"type": "expect_column_values_to_be_between",
"kwargs": {"column": "total", "min_value": 0, "max_value": 1000000}},
{"type": "expect_column_values_to_be_in_set",
"kwargs": {"column": "status",
"value_set": ["PENDING", "COMPLETED", "CANCELLED", "REFUNDED"]}},
{"type": "expect_column_pair_values_A_to_be_greater_than_B",
"kwargs": {"column_A": "updated_at", "column_B": "created_at"}},
]
}
)
if not results["success"]:
failed = [r for r in results["results"] if not r["success"]]
raise ValueError(f"Data quality check failed: {failed}")
Streaming + Batch Unification
# Lambda architecture problem: separate batch and streaming paths diverge
# Lakehouse solution: one Delta/Iceberg table serves both
# BATCH: full historical recompute (nightly)
spark.sql("""
INSERT OVERWRITE silver.orders_hourly_agg
PARTITION (agg_date)
SELECT
DATE_TRUNC('hour', order_ts) AS agg_hour,
DATE(order_ts) AS agg_date,
COUNT(*) AS order_count,
SUM(total) AS total_revenue,
COUNT(DISTINCT customer_id) AS unique_customers
FROM silver.orders
WHERE order_date >= '2024-01-01'
GROUP BY 1, 2
""")
# STREAMING: append new hourly aggregates as they complete
(
spark.readStream
.format("delta")
.table("silver.orders")
.groupBy(
F.window("order_ts", "1 hour"),
F.to_date("order_ts").alias("agg_date")
)
.agg(
F.count("*").alias("order_count"),
F.sum("total").alias("total_revenue")
)
.writeStream
.format("delta")
.outputMode("complete") # or "update" with watermark
.option("checkpointLocation", "s3://bucket/checkpoints/orders-hourly-agg")
.trigger(processingTime="5 minutes")
.toTable("silver.orders_hourly_agg")
)
Pipeline Orchestration (Airflow / Dagster)
# Airflow DAG for medallion pipeline
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
with DAG(
dag_id="orders_medallion_pipeline",
schedule="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:
ingest_bronze = DatabricksRunNowOperator(
task_id="ingest_bronze_orders",
job_id="{{ var.value.bronze_orders_job_id }}",
)
quality_check = DatabricksRunNowOperator(
task_id="validate_bronze_schema",
job_id="{{ var.value.dq_check_job_id }}",
)
silver_transform = DatabricksRunNowOperator(
task_id="transform_silver_orders",
job_id="{{ var.value.silver_orders_job_id }}",
)
gold_aggregate = DatabricksRunNowOperator(
task_id="aggregate_gold_orders_hourly",
job_id="{{ var.value.gold_orders_agg_job_id }}",
)
ingest_bronze >> quality_check >> silver_transform >> gold_aggregate
13. MLOps on Lakehouse
The lakehouse is an excellent ML platform substrate because it gives you versioned, reproducible datasets, ACID-consistent feature tables, and efficient large-scale data export — without a separate data warehouse and object store copy.
Feature Store on Delta / Iceberg
# Feature table: computed once, served to many models
# Key requirements: point-in-time correctness, efficient range reads
# Create feature table (Delta)
spark.sql("""
CREATE TABLE IF NOT EXISTS feature_store.customer_features (
customer_id BIGINT NOT NULL,
feature_date DATE NOT NULL, -- point-in-time key
order_count_30d INT,
revenue_90d DECIMAL(14,2),
days_since_last INT,
churn_risk_score FLOAT,
feature_version INT DEFAULT 1 -- track feature iteration
)
USING DELTA
PARTITIONED BY (feature_date)
TBLPROPERTIES (
'delta.enableChangeDataFeed' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true'
)
""")
# Point-in-time join: join event labels with features as of label date
# Critical: use the feature value as-of the event date, not today's value
labeled_events = spark.table("gold.churn_labels") # customer_id, label_date, churned
training_df = (
labeled_events.alias("e")
.join(
spark.table("feature_store.customer_features").alias("f"),
on=(
(F.col("e.customer_id") == F.col("f.customer_id")) &
(F.col("f.feature_date") == F.col("e.label_date"))
# Exact date match; for range: f.feature_date = e.label_date - 1
),
how="left"
)
)
Training Dataset Versioning
import mlflow
from pyspark.sql import functions as F
# Register the exact Delta table version used for training
# This ensures reproducibility: re-run the experiment with same data later
with mlflow.start_run() as run:
# Record which Delta version was used
delta_history = spark.sql("""
SELECT version, timestamp
FROM (DESCRIBE HISTORY feature_store.customer_features)
ORDER BY version DESC LIMIT 1
""").collect()[0]
mlflow.log_params({
"training_data_table": "feature_store.customer_features",
"training_data_version": delta_history["version"],
"training_data_ts": str(delta_history["timestamp"]),
"feature_date_range": "2023-01-01 to 2024-01-01",
"label_column": "churned",
"num_features": len(feature_cols),
})
# Train model
model = train_model(training_df, feature_cols, label_col="churned")
mlflow.sklearn.log_model(model, artifact_path="model")
# Reproduce training data exactly in the future:
historical_features = (
spark.read
.format("delta")
.option("versionAsOf", delta_history["version"])
.table("feature_store.customer_features")
)
Inference Data Logging
# Log model predictions to Delta for monitoring and retraining
prediction_schema = StructType([
StructField("prediction_id", StringType(), False),
StructField("model_version", StringType(), False),
StructField("customer_id", LongType(), False),
StructField("score", FloatType(), False),
StructField("predicted_at", TimestampType(), False),
StructField("feature_vector", StringType(), True), # JSON-serialized
])
def log_predictions(predictions_df: "pd.DataFrame", model_version: str):
spark_df = spark.createDataFrame(predictions_df, prediction_schema)
(
spark_df.write
.format("delta")
.mode("append")
.partitionBy(F.to_date("predicted_at").alias("prediction_date"))
.saveAsTable("ml_monitoring.predictions")
)
# Monitor for drift: compare prediction distribution today vs baseline
today_dist = spark.sql("""
SELECT AVG(score) as mean_score, STDDEV(score) as std_score,
PERCENTILE(score, 0.5) as p50, PERCENTILE(score, 0.95) as p95
FROM ml_monitoring.predictions
WHERE prediction_date = CURRENT_DATE()
AND model_version = 'v3.2'
""")
baseline_dist = spark.sql("""
SELECT AVG(score), STDDEV(score), PERCENTILE(score, 0.5), PERCENTILE(score, 0.95)
FROM ml_monitoring.predictions
WHERE prediction_date BETWEEN '2024-01-01' AND '2024-01-07'
AND model_version = 'v3.2'
""")
A/B Test Data Pipelines
-- Store experiment assignments in Gold layer
CREATE TABLE gold.experiment_assignments (
user_id BIGINT NOT NULL,
experiment STRING NOT NULL,
variant STRING NOT NULL, -- 'control' / 'treatment'
assigned_at TIMESTAMP NOT NULL
)
USING DELTA
PARTITIONED BY (DATE(assigned_at))
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Join experiment assignments with outcome metrics
SELECT
ea.variant,
COUNT(DISTINCT ea.user_id) AS users,
COUNT(o.order_id) AS conversions,
COUNT(o.order_id) / COUNT(DISTINCT ea.user_id) AS cvr,
SUM(o.total) / COUNT(DISTINCT ea.user_id) AS revenue_per_user
FROM gold.experiment_assignments ea
LEFT JOIN silver.orders o
ON ea.user_id = o.customer_id
AND o.order_ts >= ea.assigned_at -- only post-assignment events
AND o.order_ts < ea.assigned_at + INTERVAL 7 DAYS
WHERE ea.experiment = 'checkout_redesign_2024_02'
GROUP BY ea.variant;
14. Performance Tuning
File Sizing Strategy
| Scenario | Recommendation | Why |
|---|---|---|
| Batch OLAP tables | 128 MB – 1 GB per file | Balances S3 LIST overhead vs task parallelism |
| Streaming micro-batch | Auto-compact enabled; target 128 MB | Streaming creates many small files; async compaction merges them |
| Lookup / point queries | Smaller files (32–64 MB) with bloom filters | Reduces I/O when reading one record from a large table |
| Full table scan / ML training | Large files (256 MB – 1 GB) | Minimizes task launch overhead; maximizes sequential read throughput |
| Iceberg target file size | Set via write.target-file-size-bytes = 134217728 | Applied at write time to avoid compaction backlog |
Data Skipping: Min/Max Statistics
# Delta: statistics are collected per file at write time
# Default: collect stats on first 32 columns
# Increase if filtering on later columns:
spark.sql("""
ALTER TABLE orders SET TBLPROPERTIES (
'delta.dataSkippingNumIndexedCols' = '64'
)
""")
# Check if data skipping is working (Spark plan shows "PartitionFilters" and "DataFilters")
spark.sql("EXPLAIN COST SELECT * FROM orders WHERE order_date = '2024-02-14'")
# Look for: "PushedFilters: [EqualTo(order_date,2024-02-14)]"
# and numFiles reduction in query metrics
# Force stats recomputation after bulk ingestion without stats
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS")
# Or for specific columns:
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS order_id, customer_id, order_date")
Predicate Pushdown and Column Pruning
# Iceberg: predicate pushdown via scan API
scan = table.scan(
row_filter="order_date >= '2024-02-01' AND status = 'COMPLETED'",
selected_fields=("order_id", "customer_id", "total"),
# These predicates are pushed to the manifest layer:
# 1. Partition pruning: skip manifests whose partition range doesn't overlap
# 2. File-level: skip files where min(order_date) > '2024-02-01'
# 3. Parquet row-group: skip row groups based on column stats
)
# Verify with explain
df = spark.table("my_catalog.db.orders") \
.filter("order_date >= '2024-02-01' AND status = 'COMPLETED'") \
.select("order_id", "customer_id", "total")
df.explain("formatted")
# Look for: SelectedFiles in scan metrics, pushed predicates in plan
Caching Strategies
# Spark cache for repeated use within a job
# Cache Delta table (in-memory + disk)
spark.sql("CACHE TABLE orders")
# or
df = spark.table("orders")
df.cache()
df.count() # trigger materialize
# Databricks Delta cache (off-heap, SSD-backed, survives Spark restarts)
# Enabled by default on Databricks; configure node type with SSD
spark.sql("CACHE SELECT * FROM orders WHERE order_date >= '2024-01-01'")
# Iceberg: use Arrow for in-process cache
import pyarrow as pa
cached = table.scan(row_filter="order_date = '2024-02-14'").to_arrow()
# Arrow table lives in process memory; serialize with pickle or feather for reuse
# Best practice: don't cache the raw lake tables; cache Gold/aggregated tables
# Raw tables are large and change frequently — caching them is expensive
Vectorized Execution (Photon / Velox)
# Photon (Databricks): C++ vectorized engine, transparent to users
# Enable on cluster: set "Runtime" to Photon-enabled type
# Supported operations: scans, joins, aggregations, sorts, DML on Delta tables
# Velox (Meta's open-source vectorized engine): used by Presto/Trino
# No explicit user configuration; engine uses it internally
# Practical tips for vectorized engine performance:
# 1. Avoid Python UDFs — they break vectorization; use Spark SQL functions instead
# 2. Use columnar formats (Parquet/ORC) — vectorized engines read columns in batches
# 3. Prefer narrow wide tables to many joins — reduces row-level processing
# 4. Enable adaptive query execution (AQE):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
15. Migration & Interoperability
Migrating Hive Tables to Iceberg
# Option 1: In-place migration (no data copy)
# Converts Hive table metadata to Iceberg format; data files stay in place
spark.sql("""
CALL my_catalog.system.migrate(
table => 'my_hive_catalog.db.orders'
)
""")
# Note: Hive table must be external (not managed) with Parquet/ORC files
# Option 2: Snapshot migration (dual-read during transition)
# Creates an Iceberg table that reads the existing Hive data files
spark.sql("""
CALL my_catalog.system.snapshot(
source_table => 'my_hive_catalog.db.orders',
table => 'my_iceberg_catalog.db.orders'
)
""")
# Iceberg table shares files with Hive; new writes go to Iceberg only
# Hive table still readable (no dual write needed during transition)
# After validation, cut over by pointing all readers to Iceberg table
# Option 3: CTAS (safest, requires double storage temporarily)
spark.sql("""
CREATE TABLE my_iceberg_catalog.db.orders_iceberg
USING iceberg
AS SELECT * FROM my_hive_catalog.db.orders_hive
""")
Delta to Iceberg (UniForm)
Delta UniForm enables reading Delta tables as Iceberg or Hudi tables without any data copy. The Delta write path generates Iceberg metadata alongside the Delta log, so Trino/Snowflake/etc. can read the table via the Iceberg REST catalog.
-- Enable UniForm on a Delta table (Databricks Delta 3.x+)
ALTER TABLE orders SET TBLPROPERTIES (
'delta.universalFormat.enabledFormats' = 'iceberg'
-- 'delta.universalFormat.enabledFormats' = 'iceberg,hudi' -- both
);
-- UniForm generates Iceberg metadata at:
-- s3://bucket/tables/orders/metadata/*.metadata.json
-- Register this with your Iceberg REST catalog, Snowflake, etc.
-- Convert Delta table to standalone Iceberg (data copy required):
-- Step 1: Export as Parquet snapshot
spark.sql("""
CREATE TABLE orders_parquet USING PARQUET AS
SELECT * FROM orders
""")
-- Step 2: Create Iceberg table from Parquet
spark.sql("""
CALL my_iceberg_catalog.system.add_files(
table => 'db.orders_iceberg',
source_table => 'my_spark_catalog.db.orders_parquet'
)
""")
Multi-Engine Access
# Iceberg table readable by multiple engines simultaneously
# Spark (primary writer + reader)
spark.table("my_catalog.db.orders").show()
# Trino (ad-hoc SQL analytics)
# trino.properties: connector.name=iceberg, iceberg.catalog.type=rest
# SELECT * FROM my_catalog.db.orders WHERE order_date = '2024-02-14'
# DuckDB (local laptop analytics)
pip install duckdb pyiceberg
import duckdb
from pyiceberg.catalog import load_catalog
catalog = load_catalog("rest", uri="http://my-catalog:8181/api")
table = catalog.load_table("db.orders")
arrow_data = table.scan(row_filter="order_date = '2024-02-14'").to_arrow()
conn = duckdb.connect()
conn.register("orders", arrow_data)
result = conn.sql("SELECT customer_id, SUM(total) FROM orders GROUP BY 1 ORDER BY 2 DESC LIMIT 10").df()
# Flink + Iceberg (streaming writes from Flink)
# flink-conf.yaml / table environment:
# table_env.get_config().set("table.exec.mini-batch.enabled", "true")
# Flink SQL
"""
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://nessie:19120/api/v1',
'warehouse' = 's3://my-bucket/warehouse'
);
CREATE TABLE iceberg_catalog.db.events (
event_id BIGINT,
user_id BIGINT,
event_ts TIMESTAMP(3),
payload STRING,
WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND
) WITH (
'format-version' = '2',
'write.upsert.enabled' = 'true'
);
INSERT INTO iceberg_catalog.db.events
SELECT event_id, user_id, event_ts, payload
FROM kafka_source;
"""
Vendor Lock-in Considerations
| Risk Area | Delta Lake | Iceberg | Hudi |
|---|---|---|---|
| Format lock-in | Low (open spec, but Databricks leads) | Very low (Apache-governed, many implementations) | Low (Apache-governed) |
| Catalog lock-in | Medium (Unity Catalog is Databricks-only) | Low (multiple REST catalog impls) | Medium (typically HMS or Glue) |
| Feature lock-in | Medium (Photon, AutoOptimize are Databricks-only) | Low | Low |
| Ecosystem maturity | Excellent on Databricks; good on EMR/OSS | Excellent everywhere | Good on Spark; limited elsewhere |
| Escape hatch | UniForm → Iceberg; export to Parquet | Export to Parquet trivially | Export to Parquet; convert to Iceberg |
16. Interview Scenarios & Decision Framework
Decision Framework: Delta vs Iceberg vs Hudi
| Question | Delta | Iceberg | Hudi |
|---|---|---|---|
| Already on Databricks? | Strong yes | Via UniForm | No |
| Need multi-engine reads (Trino, Snowflake, Flink)? | Possible (UniForm) | Best choice | Possible |
| Near-real-time CDC upserts (< 1 min latency)? | Possible | Possible | Best choice |
| Need partition evolution without data rewrite? | No | Best choice | No |
| Large enterprise, need governance + lineage? | Unity Catalog | Polaris/Nessie + external | HMS + external |
| AWS-native stack (Glue, Athena, EMR)? | Good (native EMR support) | Excellent (native Athena/Glue) | Good |
| Google Cloud stack? | Databricks on GCP | BigLake (Iceberg-native) | Less common |
| Azure stack? | Excellent (Azure Databricks + ADLS) | Good (via Fabric/Synapse) | Limited |
| Long-term open format strategy? | Medium (Databricks controls roadmap) | Best (Apache foundation, many vendors) | Good (Apache foundation) |
Common Interview Questions and Answers
Q: Explain ACID transactions in a lakehouse — how does Delta Lake achieve atomicity?
Object storage (S3/GCS) provides no atomicity at the directory level — you can't atomically add 10 new Parquet files and remove 5 old ones. Delta Lake solves this with an append-only transaction log (_delta_log/).
When a write completes, Delta atomically writes a single JSON entry to the log that describes all add and remove actions. Readers determine the current table state by replaying the log from the latest checkpoint. Because the log entry is a single object write (atomic in S3), either the entire commit happens or nothing changes from readers' perspective.
Isolation is achieved via optimistic concurrency control: writers read the current log version, compute their changes, then attempt to write the next version number. If another writer committed in the meantime (version conflict), the writer retries or raises a conflict exception depending on the isolation level (Serializable vs WriteSerializable).
Q: Why does Iceberg handle partition evolution better than Delta Lake?
Delta Lake partitioning is a physical property of the directory structure: files are written to partition_col=value/ paths. Changing the partition column means the existing directory structure is wrong and all files must be rewritten.
Iceberg treats partitioning as metadata, not directory structure. Each manifest file records which partition spec was used when writing its data files. When you execute ALTER TABLE REPLACE PARTITION FIELD, Iceberg creates a new partition spec but the old manifests (and their files) remain valid and readable under the old spec.
Queries work transparently across both specs: Iceberg evaluates the partition predicate against each spec separately, pruning manifests from both. This is possible because Iceberg's hidden partitioning decouples the physical layout from the logical column — the partition value is derived (e.g., days(event_ts)), not stored as a directory name in the user's visible schema.
Q: You have a Delta table with 10M small files from a year of streaming writes. How do you fix this?
Root cause: Streaming micro-batches write one or more small files per partition per trigger interval. With 5-minute triggers and date partitioning, you get up to 288 files/day — 105,120 files/year.
Immediate fix:
-- Run partition-scoped OPTIMIZE to avoid scanning unneeded partitions
-- Start with recent partitions first (most impactful for queries)
OPTIMIZE orders WHERE order_date >= '2024-01-01';
-- Z-ORDER if you have common filter columns
OPTIMIZE orders ZORDER BY (customer_id) WHERE order_date >= '2024-01-01';
-- Then vacuum old files (after OPTIMIZE, old small files are "removed" in log)
VACUUM orders RETAIN 168 HOURS;
Prevention going forward:
- Enable
delta.autoOptimize.autoCompact = trueon the streaming target table - Or schedule a nightly OPTIMIZE job scoped to yesterday's partition
- Use
trigger(availableNow=True)instead of micro-batch for non-latency-critical pipelines - Consider coarser partitioning (weekly instead of daily) if query patterns allow
Q: How do you implement SCD Type 2 on a lakehouse at scale?
SCD Type 2 requires closing old rows and inserting new ones. The naive approach (full scan + merge) breaks at billion-row scale. Production approach:
- Detect changes: join incoming updates to the current snapshot using the business key. Use a hash of all tracked columns to detect changes without comparing every column.
- Partition the SCD table by hash bucket of the business key: this ensures the MERGE only scans relevant partitions. Don't partition by date — the "current" rows span all historical dates.
- MERGE into a partition-pruned window: include the hash bucket in the merge condition so Delta reads only affected files.
- Close old rows and insert new rows in two operations or use a single MERGE with whenMatchedUpdate + whenNotMatchedInsert, relying on the fact that "closed" rows become NOT MATCHED on the next run (they have a different effective_end, so a second MERGE finds them correctly).
-- Hash bucket partitioning for SCD Type 2
CREATE TABLE dim_customer (
surrogate_key BIGINT GENERATED ALWAYS AS IDENTITY,
customer_id BIGINT NOT NULL,
hash_bucket INT GENERATED ALWAYS AS (customer_id % 256),
-- ... tracked attributes ...
is_current BOOLEAN NOT NULL DEFAULT true,
effective_start TIMESTAMP NOT NULL,
effective_end TIMESTAMP
)
USING DELTA
PARTITIONED BY (hash_bucket);
Q: Describe the write protocol for concurrent writers in Iceberg (snapshot isolation).
Iceberg uses optimistic concurrency with snapshot isolation:
- Writer reads the current table metadata (snapshot N).
- Writer produces new data files and a new manifest.
- Writer constructs a new metadata file pointing to snapshot N+1 (which includes the new manifest + all existing manifests from snapshot N).
- Writer atomically swaps the catalog pointer from the old metadata file to the new one. Most catalogs implement this with a conditional PUT (if-match on the current metadata path) or CAS (compare-and-swap).
- If another writer committed first (the catalog pointer changed), the writer detects the conflict, reads the new current state, re-validates whether the conflict is critical (e.g., overlapping files), and either retries or raises an exception.
Key property: readers always see a consistent snapshot because they hold a reference to an immutable metadata file. New writes create new metadata files; they never modify existing ones. There are no exclusive file locks.
Q: How would you design a feature store on top of a lakehouse for ML?
Key requirements: point-in-time correctness, low-latency batch serving, efficient incremental updates, versioned datasets for training reproducibility.
Architecture:
- Offline store: Delta or Iceberg tables partitioned by
feature_date. Computed daily or hourly by Spark jobs. Serves training data via point-in-time joins. - Online store: Redis or DynamoDB for <10ms serving. Populated by a Spark streaming job that reads the Delta CDF and upserts to Redis. Key = entity ID; value = latest feature vector.
- Point-in-time join: join training labels (customer_id, label_date) to features using
f.feature_date = l.label_date. Never join tofeature_date = CURRENT_DATE()— this leaks future information into historical training data. - Versioning: record the Delta table version + feature_date range in MLflow. To reproduce training data, read with
versionAsOf. - Feature groups: separate tables per entity type (customer, product, session). Join on demand for model-specific feature sets.
Production Gotchas Summary
| Gotcha | Symptom | Fix |
|---|---|---|
| VACUUM too aggressive | Structured Streaming job fails on restart: "version not found" | Set retention ≥ 7 days; use RETAIN 168 HOURS |
| No partition pruning in MERGE | MERGE scans entire target table for every batch | Add partition column to merge condition |
| Iceberg identity partition on timestamp | One partition per unique timestamp → millions of partitions | Use days(ts) or hours(ts) transform |
| Missing precombine field (Hudi) | Silent data loss when batch has duplicate keys | Always set hoodie.datasource.write.precombine.field |
| Rename without column mapping (Delta) | Rename works in metadata but old Parquet files return null for renamed column | Enable column mapping mode before renaming |
| Future data leakage in point-in-time join | Model performs great in training, fails in production | Strict as-of join: f.feature_date = e.label_date or < e.label_date |
| Small files after streaming | Slow queries; thousands of file opens per task | Enable autoCompact or schedule nightly OPTIMIZE |
| Catalog pointer update race (Iceberg) | Concurrent writers: CommitFailedException | Expected behavior; add retry logic; use higher-level frameworks (Flink's exactly-once) |
| DML on non-partitioned Delta table | UPDATE/DELETE rewrites entire table | Always partition tables that receive DML; use deletion vectors (Delta 2.3+) |
| Iceberg snapshot accumulation | Metadata files grow without bound; expire_snapshots never called | Schedule expire_snapshots + remove_orphan_files daily |
.git directory pointer — it tells you where HEAD is. This analogy holds surprisingly well and helps reason through questions about concurrent writes, time travel, and metadata management.
17. Local Development Setup (macOS)
You don't need a cloud account or Databricks to experiment with lakehouse table formats. Everything below runs on a MacBook with Docker.
Delta Lake with PySpark (Simplest)
Install PySpark + delta-spark. Tables are just directories on your local filesystem.
# Create a venv and install
python3 -m venv lakehouse-lab && source lakehouse-lab/bin/activate
pip install pyspark==3.5.* delta-spark==3.2.* jupyterlab pandas
# Start Jupyter (optional)
jupyter lab
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
# Builder pattern — delta-spark auto-configures the Spark session
builder = (
SparkSession.builder
.master("local[*]")
.appName("delta-local")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Keep Spark UI off for local dev
.config("spark.ui.enabled", "false")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Create a Delta table on local filesystem
data = spark.createDataFrame([
(1, "alice", "2024-01-15", 120.50),
(2, "bob", "2024-01-16", 89.99),
(3, "carol", "2024-02-01", 210.00),
], ["order_id", "customer", "order_date", "amount"])
data.write.format("delta").mode("overwrite").save("/tmp/delta-lab/orders")
# Read it back
orders = spark.read.format("delta").load("/tmp/delta-lab/orders")
orders.show()
# Time travel — check the log
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/tmp/delta-lab/orders")
dt.history().select("version", "timestamp", "operation").show(truncate=False)
ls /tmp/delta-lab/orders/_delta_log/ — you'll see JSON commit files (00000000000000000000.json, etc.) and eventually checkpoint Parquet files. Reading these helps build intuition for how Delta tracks changes.
Delta Lake with DuckDB (No Spark, Fastest)
DuckDB reads Delta tables natively — perfect for quick local queries without waiting for Spark to start up.
pip install duckdb deltalake pandas
import duckdb
con = duckdb.connect()
# Write a Delta table from DuckDB (via deltalake Python bindings)
con.sql("""
INSTALL delta; LOAD delta;
CREATE TABLE local_orders AS SELECT * FROM read_csv_auto('orders.csv');
""")
# Or read existing Delta tables written by Spark
con.sql("""
SELECT customer, SUM(amount) as total
FROM delta_scan('/tmp/delta-lab/orders')
GROUP BY customer
ORDER BY total DESC
""").show()
# deltalake Python package — write Delta without Spark at all
from deltalake import DeltaTable, write_deltalake
import pandas as pd
df = pd.DataFrame({
"order_id": [4, 5],
"customer": ["dave", "eve"],
"order_date": ["2024-02-10", "2024-02-11"],
"amount": [55.00, 340.00]
})
# Write (creates or appends)
write_deltalake("/tmp/delta-lab/orders", df, mode="append")
# Read back
dt = DeltaTable("/tmp/delta-lab/orders")
dt.to_pandas()
Iceberg with PySpark + Local Catalog
pip install pyspark==3.5.*
# Iceberg JARs are pulled automatically by Spark package config below
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master("local[*]")
.appName("iceberg-local")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Hadoop catalog — stores metadata in local filesystem (simplest for dev)
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-lab/warehouse")
.config("spark.ui.enabled", "false")
.getOrCreate()
)
# Create a namespaced table
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.lab")
spark.sql("""
CREATE TABLE IF NOT EXISTS local.lab.orders (
order_id INT,
customer STRING,
order_date DATE,
amount DOUBLE
)
USING iceberg
PARTITIONED BY (days(order_date))
""")
# Insert data
spark.sql("""
INSERT INTO local.lab.orders VALUES
(1, 'alice', DATE '2024-01-15', 120.50),
(2, 'bob', DATE '2024-01-16', 89.99),
(3, 'carol', DATE '2024-02-01', 210.00)
""")
# Snapshots
spark.sql("SELECT * FROM local.lab.orders.snapshots").show(truncate=False)
# Partition evolution — no rewrite!
spark.sql("ALTER TABLE local.lab.orders REPLACE PARTITION FIELD days(order_date) WITH months(order_date)")
Iceberg with PyIceberg + DuckDB (No Spark)
pip install "pyiceberg[sql-sqlite,pyarrow]" duckdb pandas
from pyiceberg.catalog.sql import SqlCatalog
import pyarrow as pa
# SQLite-backed catalog — zero infrastructure
catalog = SqlCatalog(
"local",
**{
"uri": "sqlite:////tmp/iceberg-lab/catalog.db",
"warehouse": "file:///tmp/iceberg-lab/warehouse",
}
)
catalog.create_namespace_if_not_exists("lab")
# Create table from PyArrow schema
schema = pa.schema([
pa.field("order_id", pa.int32()),
pa.field("customer", pa.string()),
pa.field("order_date", pa.date32()),
pa.field("amount", pa.float64()),
])
table = catalog.create_table_if_not_exists("lab.orders", schema=schema)
# Append data
batch = pa.table({
"order_id": [1, 2, 3],
"customer": ["alice", "bob", "carol"],
"order_date": [
pa.scalar("2024-01-15").cast(pa.date32()),
pa.scalar("2024-01-16").cast(pa.date32()),
pa.scalar("2024-02-01").cast(pa.date32()),
],
"amount": [120.50, 89.99, 210.00],
})
table.append(batch)
# Query with DuckDB
import duckdb
arrow_table = table.scan().to_arrow()
duckdb.sql("SELECT customer, amount FROM arrow_table ORDER BY amount DESC").show()
Full Local Stack with Docker Compose
This gives you an S3-compatible store (MinIO), a REST catalog (Nessie for Iceberg), and Spark — closer to what production looks like.
# docker-compose.yml
services:
minio:
image: minio/minio:latest
ports:
- "9000:9000"
- "9001:9001" # Console
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password
command: server /data --console-address ":9001"
volumes:
- minio-data:/data
minio-init:
image: minio/mc:latest
depends_on: [minio]
entrypoint: >
/bin/sh -c "
mc alias set local http://minio:9000 admin password;
mc mb local/lakehouse --ignore-existing;
exit 0;
"
nessie:
image: ghcr.io/projectnessie/nessie:latest
ports:
- "19120:19120"
spark:
image: bitnami/spark:3.5
ports:
- "4040:4040"
- "8888:8888"
environment:
- SPARK_MODE=master
volumes:
- ./notebooks:/home/notebooks
depends_on: [minio, nessie]
volumes:
minio-data:
# Start the stack
docker compose up -d
# Create a MinIO bucket (if minio-init didn't run)
docker compose exec minio mc alias set local http://localhost:9000 admin password
docker compose exec minio mc mb local/lakehouse
# Connect Spark to MinIO + Nessie (inside the Spark container or local PySpark)
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master("local[*]")
.config("spark.jars.packages", ",".join([
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1",
"org.apache.hadoop:hadoop-aws:3.3.4",
"org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.95.0",
]))
# S3/MinIO
.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
.config("spark.hadoop.fs.s3a.access.key", "admin")
.config("spark.hadoop.fs.s3a.secret.key", "password")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
# Nessie catalog for Iceberg
.config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.nessie.catalog-impl",
"org.apache.iceberg.nessie.NessieCatalog")
.config("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v2")
.config("spark.sql.catalog.nessie.warehouse", "s3a://lakehouse/iceberg")
.config("spark.sql.catalog.nessie.ref", "main")
.config("spark.sql.extensions", ",".join([
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"org.projectnessie.spark.extensions.NessieSparkSessionExtensions",
]))
.getOrCreate()
)
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.lab")
spark.sql("""
CREATE TABLE nessie.lab.events (
event_id BIGINT,
event_type STRING,
event_ts TIMESTAMP,
payload STRING
) USING iceberg
PARTITIONED BY (days(event_ts))
""")
# Now you have Iceberg tables stored in MinIO with Nessie catalog — same
# architecture as production but all on your laptop
Which Local Setup to Use?
| Goal | Setup | Time to Start |
|---|---|---|
| Quick Delta read/write, no Spark overhead | DuckDB + deltalake Python | 30 seconds |
| Quick Iceberg read/write, no Spark overhead | PyIceberg + SQLite catalog | 30 seconds |
| Delta with full Spark API (MERGE, Z-ORDER) | PySpark + delta-spark | 2 minutes |
| Iceberg with full Spark SQL | PySpark + iceberg JARs | 2 minutes |
| Realistic prod simulation (S3 + catalog + Spark) | Docker Compose stack | 5 minutes |
brew install openjdk@17 and ensure JAVA_HOME is set. DuckDB and PyIceberg do not require Java.
18. Production Setup
Production lakehouse deployments center around three decisions: where data lives (storage), how tables are discovered (catalog), and what runs the queries (compute). Delta and Iceberg have different natural fits depending on whether you use Databricks or build on AWS services directly.
Architecture Comparison
| Component | Databricks (Managed) | AWS-Native (No Databricks) |
|---|---|---|
| Storage | S3 / ADLS / GCS (you own the buckets) | S3 (you own the buckets) |
| Table Format | Delta (native) — Iceberg readable via UniForm | Iceberg (best native support) or Delta |
| Catalog | Unity Catalog | AWS Glue Data Catalog (most common) Nessie / Polaris (Iceberg-specific) |
| Compute — ETL | Databricks Jobs (Spark clusters) | EMR Spark / Glue ETL / Flink on EMR |
| Compute — SQL BI | Databricks SQL Warehouses | Athena (serverless) / Trino on EMR / Redshift Spectrum |
| Orchestration | Databricks Workflows | Airflow (MWAA) / Step Functions |
| Governance | Unity Catalog (RBAC, lineage, masking) | Lake Formation + Glue Catalog (IAM-based) |
| Cost Model | DBU (Databricks Units) + cloud infra | Per-query (Athena) / per-instance (EMR) + S3 |
| Complexity | Lower — single platform, managed | Higher — more services to wire together |
Databricks + Delta Lake
This is the simplest production path. Databricks is Delta's primary maintainer, so Delta features land in Databricks first (liquid clustering, deletion vectors, UniForm). Unity Catalog handles governance end-to-end.
# Typical Databricks production pipeline (runs on a Databricks Job cluster)
# ── Bronze: ingest raw events from Kafka to Delta ──
raw_events = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "earliest")
.load()
)
(raw_events
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://prod-lakehouse/checkpoints/bronze_events")
.trigger(availableNow=True) # Process all available, then stop (cost-effective)
.toTable("prod_catalog.bronze.user_events"))
# ── Silver: clean + deduplicate ──
spark.sql("""
CREATE OR REPLACE TABLE prod_catalog.silver.user_events AS
SELECT
from_json(value, 'user_id STRING, event_type STRING, properties MAP<STRING,STRING>, ts TIMESTAMP') AS parsed,
timestamp AS kafka_ts
FROM prod_catalog.bronze.user_events
QUALIFY ROW_NUMBER() OVER (
PARTITION BY parsed.user_id, parsed.event_type, parsed.ts
ORDER BY kafka_ts DESC
) = 1
""")
# ── Gold: aggregate for BI ──
spark.sql("""
CREATE OR REPLACE TABLE prod_catalog.gold.daily_active_users AS
SELECT
DATE(parsed.ts) AS activity_date,
COUNT(DISTINCT parsed.user_id) AS dau,
COUNT(*) AS total_events
FROM prod_catalog.silver.user_events
GROUP BY DATE(parsed.ts)
""")
Databricks Unity Catalog setup
-- 1. Hierarchy: metastore > catalog > schema > table
CREATE CATALOG IF NOT EXISTS prod_catalog;
CREATE SCHEMA IF NOT EXISTS prod_catalog.bronze;
CREATE SCHEMA IF NOT EXISTS prod_catalog.silver;
CREATE SCHEMA IF NOT EXISTS prod_catalog.gold;
-- 2. External location (maps S3 path to a storage credential)
CREATE EXTERNAL LOCATION IF NOT EXISTS prod_lakehouse
URL 's3://prod-lakehouse/'
WITH (STORAGE CREDENTIAL aws_prod_credential);
-- 3. Managed tables (Databricks controls the file layout)
CREATE TABLE prod_catalog.silver.orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(12,2),
order_date DATE
)
USING DELTA
CLUSTER BY (customer_id); -- Liquid clustering (Delta 3.0+), replaces PARTITIONED BY + Z-ORDER
-- 4. Access control
GRANT USE CATALOG ON CATALOG prod_catalog TO `data-team`;
GRANT USE SCHEMA ON SCHEMA prod_catalog.gold TO `analysts`;
GRANT SELECT ON TABLE prod_catalog.gold.daily_active_users TO `analysts`;
-- 5. Enable Iceberg reads via UniForm (so Trino/Athena can read this Delta table)
ALTER TABLE prod_catalog.silver.orders
SET TBLPROPERTIES ('delta.universalFormat.enabledFormats' = 'iceberg');
Databricks table maintenance (scheduled via Workflows)
-- Run nightly on high-write tables
OPTIMIZE prod_catalog.silver.orders;
-- Clean up old files (default retention = 7 days)
VACUUM prod_catalog.silver.orders;
-- Check table health
DESCRIBE DETAIL prod_catalog.silver.orders;
-- Shows: numFiles, sizeInBytes, numPartitions, clusteringColumns
USING iceberg. Unity Catalog can manage Iceberg tables natively. However, some Delta-specific features (liquid clustering, deletion vectors) are only available on Delta tables.
AWS-Native + Iceberg (No Databricks)
Iceberg has the broadest native support across AWS services. Athena, EMR, Glue ETL, and Redshift Spectrum all read/write Iceberg tables registered in the Glue Data Catalog.
Architecture diagram
┌─────────────────────────────────────────────────────────────────────┐
│ AWS Account │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Airflow │───▶│ EMR Spark │───▶│ S3 (Iceberg data) │ │
│ │ (MWAA) │ │ (ETL jobs) │ │ s3://prod/tables/ │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
│ │ │ ▲ │
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Glue Data │ │ │
│ │ │ Catalog │◀───────────────┘ │
│ │ └──────────────┘ (metadata) │
│ │ ▲ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Glue ETL │ │ Athena │ ← Serverless SQL (analysts)│
│ │ (crawlers) │ │ (BI/SQL) │ │
│ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Lake Formation (governance, RBAC) │ │
│ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Step 1 — S3 bucket layout
# Recommended bucket structure
s3://prod-lakehouse/
├── tables/
│ ├── bronze/
│ │ └── user_events/ # Iceberg table (data + metadata)
│ ├── silver/
│ │ └── user_events_clean/
│ └── gold/
│ └── daily_active_users/
├── checkpoints/ # Spark structured streaming checkpoints
└── temp/ # Staging area for ETL jobs
Step 2 — Create Iceberg tables via Athena (Glue Catalog)
-- Athena uses Glue Data Catalog by default — no extra catalog config needed
CREATE DATABASE IF NOT EXISTS bronze;
CREATE DATABASE IF NOT EXISTS silver;
CREATE DATABASE IF NOT EXISTS gold;
-- Iceberg table in Athena (V3 engine)
CREATE TABLE bronze.user_events (
event_id BIGINT,
user_id STRING,
event_type STRING,
properties MAP<STRING, STRING>,
event_ts TIMESTAMP
)
PARTITIONED BY (days(event_ts))
LOCATION 's3://prod-lakehouse/tables/bronze/user_events/'
TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format' = 'PARQUET',
'write_compression' = 'zstd'
);
-- Insert data directly from Athena
INSERT INTO bronze.user_events
SELECT * FROM raw_events_staging;
-- Time travel query
SELECT * FROM bronze.user_events FOR TIMESTAMP AS OF
TIMESTAMP '2024-02-01 00:00:00';
Step 3 — EMR Spark for heavy ETL
# EMR Spark job — reads/writes Iceberg via Glue Catalog
# Submit with: aws emr add-steps --cluster-id j-XXXXX --steps file://step.json
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("silver-etl")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Glue Catalog as the Iceberg catalog
.config("spark.sql.catalog.glue", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.glue.warehouse",
"s3://prod-lakehouse/tables/")
.config("spark.sql.catalog.glue.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
.getOrCreate()
)
# Bronze → Silver: clean and deduplicate
spark.sql("""
CREATE OR REPLACE TABLE glue.silver.user_events_clean
USING iceberg
PARTITIONED BY (days(event_ts))
AS
SELECT
event_id, user_id, event_type, properties, event_ts
FROM glue.bronze.user_events
WHERE user_id IS NOT NULL
AND event_ts >= DATE '2024-01-01'
QUALIFY ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_ts) = 1
""")
# MERGE for incremental upsert (Iceberg supports full MERGE)
spark.sql("""
MERGE INTO glue.silver.user_events_clean AS target
USING (SELECT * FROM glue.bronze.user_events WHERE event_ts >= current_date - 1) AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Table maintenance
spark.sql("CALL glue.system.rewrite_data_files(table => 'silver.user_events_clean')")
spark.sql("CALL glue.system.expire_snapshots(table => 'silver.user_events_clean', older_than => TIMESTAMP '2024-02-01 00:00:00')")
spark.sql("CALL glue.system.remove_orphan_files(table => 'silver.user_events_clean')")
Step 4 — Glue ETL (serverless Spark, no cluster management)
# Glue 4.0+ supports Iceberg natively
# glue_job.py — deployed via aws glue create-job
import sys
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Glue auto-configures the Glue Catalog — tables registered in Glue are available directly
# Set Iceberg configs for write support
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.warehouse",
"s3://prod-lakehouse/tables/")
# Gold layer aggregation
spark.sql("""
CREATE OR REPLACE TABLE glue_catalog.gold.daily_active_users
USING iceberg
AS
SELECT
DATE(event_ts) AS activity_date,
COUNT(DISTINCT user_id) AS dau,
COUNT(*) AS total_events
FROM glue_catalog.silver.user_events_clean
GROUP BY DATE(event_ts)
""")
Step 5 — Lake Formation governance
# Grant analysts read access to gold tables via AWS CLI
aws lakeformation grant-permissions \
--principal '{"DataLakePrincipalIdentifier": "arn:aws:iam::123456789:role/analysts"}' \
--resource '{"Table": {"DatabaseName": "gold", "Name": "daily_active_users"}}' \
--permissions '["SELECT"]'
# Column-level security — hide PII columns from analysts
aws lakeformation grant-permissions \
--principal '{"DataLakePrincipalIdentifier": "arn:aws:iam::123456789:role/analysts"}' \
--resource '{
"TableWithColumns": {
"DatabaseName": "silver",
"Name": "user_events_clean",
"ColumnNames": ["event_id", "event_type", "event_ts"]
}
}' \
--permissions '["SELECT"]'
# user_id and properties columns are NOT listed → analysts cannot see them
AWS-Native + Delta Lake (No Databricks)
Delta works on AWS without Databricks, but with less native integration than Iceberg. You provide the Delta JARs yourself on EMR/Glue.
# EMR Spark with Delta (no Databricks)
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
spark = (
SparkSession.builder
.appName("delta-emr")
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Glue Catalog as Hive Metastore replacement
.config("spark.hadoop.hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.enableHiveSupport()
.getOrCreate()
)
# Delta tables registered in Glue Catalog
spark.sql("""
CREATE TABLE IF NOT EXISTS bronze.orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(12,2),
order_date DATE
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION 's3://prod-lakehouse/tables/bronze/orders/'
""")
# MERGE, OPTIMIZE, VACUUM all work
spark.sql("""
OPTIMIZE bronze.orders WHERE order_date >= current_date - 7
ZORDER BY (customer_id)
""")
spark.sql("VACUUM bronze.orders RETAIN 168 HOURS")
INPUTFORMAT = 'SymlinkTextInputFormat' (via manifest generation) or as a standard Delta table if using EMR 6.x+. Athena can read Delta tables only via symlink manifests — you must run GENERATE symlink_format_manifest FOR TABLE ... after each write to keep Athena in sync. This is a notable operational overhead compared to Iceberg, where Athena reads natively without manifest generation.
-- Generate manifest so Athena can read the Delta table
-- Must be re-run after every write!
GENERATE symlink_format_manifest FOR TABLE bronze.orders;
-- Then in Athena, create a table pointing to the manifest
CREATE EXTERNAL TABLE bronze.orders_athena (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(12,2)
)
PARTITIONED BY (order_date DATE)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://prod-lakehouse/tables/bronze/orders/_symlink_format_manifest/';
Catalog Decision Guide
| Scenario | Recommended Catalog | Why |
|---|---|---|
| Databricks shop, Delta tables | Unity Catalog | Tightest integration, RBAC + lineage built-in, liquid clustering support |
| AWS-native, Iceberg tables | AWS Glue Data Catalog | Zero infrastructure, Athena/EMR/Glue/Redshift all read it natively |
| Multi-cloud or vendor-neutral, Iceberg | Nessie or Apache Polaris | Open-source REST catalog, git-like branching (Nessie), works with Spark/Trino/Flink |
| AWS-native, Delta tables | Glue Catalog (Hive mode) | Works but limited — no native Athena read (need manifests), no Delta-specific governance |
| Need both Delta and Iceberg | Unity Catalog + UniForm | Write Delta (for Databricks features), expose as Iceberg for external engines |
| Self-hosted, budget-conscious | Hive Metastore (HMS) | Battle-tested, runs on a single Postgres/MySQL instance, broad engine support |
Compute Decision Guide
| Workload | Databricks | AWS-Native |
|---|---|---|
| Heavy ETL (TBs, complex joins) | Databricks Jobs (auto-scaling Spark) | EMR Spark (EC2 or EKS) |
| Ad-hoc SQL / BI dashboards | Databricks SQL Warehouse | Athena (serverless, pay per query) |
| Light ETL (GBs, simple transforms) | Databricks Jobs (small cluster) | Glue ETL (serverless Spark) |
| Streaming (Kafka → lakehouse) | Databricks Structured Streaming | EMR Flink or EMR Spark Streaming |
| Interactive exploration | Databricks Notebooks | EMR Studio / SageMaker Notebooks |
| Low-latency serving (sub-second) | Databricks SQL + caching | Trino on EMR (with Hive connector cache) |
Cost Comparison (Rough Ballpark)
| Component | Databricks | AWS-Native |
|---|---|---|
| Compute (ETL) | ~$3–5K/mo (Jobs DBUs) | ~$2–4K/mo (EMR instances) |
| Compute (SQL/BI) | ~$2–4K/mo (SQL Warehouse DBUs) | ~$500–1.5K/mo (Athena per-query) |
| Storage (S3) | ~$230/mo (same S3) | ~$230/mo |
| Catalog | Included in Databricks | Glue Catalog: ~$1/mo per 100K objects |
| Orchestration | Included (Workflows) | ~$400–800/mo (MWAA) |
| Total estimate | ~$6–10K/mo | ~$3.5–7K/mo |
| Operational overhead | Lower (managed platform) | Higher (more knobs, more services) |
AWS-native, cost-sensitive, multi-engine? → Iceberg + Glue Catalog + Athena/EMR. Iceberg has first-class AWS support and avoids vendor lock-in. You'll spend more time on infrastructure but less on licensing.