Table of Contents

0. Setup & Environment

No local Spark installation required. Docker gives you a fully working PySpark or Scala shell in under a minute.

Prerequisites

Install Docker Desktop (Mac):

brew install --cask docker   # or download from docker.com
docker --version             # verify: Docker version 27.x.x

Start PySpark Shell

One command — no local Python or JVM setup needed:

docker run -it --rm \
  -p 4040:4040 \
  apache/spark-py:latest \
  /opt/spark/bin/pyspark
Tip: --rm auto-removes the container on exit, keeping your Docker image list clean. Drop it if you want to inspect container state post-run.

Start Spark Shell (Scala)

docker run -it --rm \
  -p 4040:4040 \
  apache/spark:latest \
  /opt/spark/bin/spark-shell

Working with Local Files

Mount a host directory into the container with -v:

docker run -it --rm \
  -p 4040:4040 \
  -v $(pwd)/data:/data \
  apache/spark-py:latest \
  /opt/spark/bin/pyspark

Then inside the PySpark shell:

df = spark.read.csv("/data/myfile.csv", header=True)
df.show(5)

Docker Compose — Standalone Cluster (Optional)

For multi-worker practice, create docker-compose.yml:

services:
  spark-master:
    image: apache/spark:latest
    command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master
    ports:
      - "8080:8080"   # Spark master UI
      - "7077:7077"   # cluster connect port
    environment:
      - SPARK_MODE=master

  spark-worker:
    image: apache/spark:latest
    command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
docker compose up -d    # start cluster in background
docker compose down     # tear it down
Note: The master UI at http://localhost:8080 shows connected workers. Submit jobs with spark-submit --master spark://localhost:7077 ....

Quick Verify

Inside the PySpark shell, run a trivial job to confirm everything is wired up:

df = spark.range(1000)
df.filter(df.id % 2 == 0).count()
# Expected output: 500

Then open http://localhost:4040 — you should see one completed job in the Spark UI.

Architecture

Spark is a unified engine for large-scale data analytics. It runs as a master/worker cluster: one Driver orchestrates work; many Executors do the actual computation in parallel.

Core Components

ComponentRoleLives on
Driver ProgramHosts SparkContext / SparkSession; builds logical plan; schedules stages onto executorsYour machine or cluster head node
SparkContextEntry point for Spark 1.x / RDD API; manages cluster connectionDriver JVM
SparkSessionUnified entry point (Spark 2+); wraps SparkContext; adds DataFrame/SQL APIDriver JVM
ExecutorJVM process on each worker node; runs tasks; caches dataWorker nodes
TaskSmallest unit of work — processes one partitionExecutor thread
StageSet of tasks with no shuffle boundary between themLogical grouping
JobTriggered by an action; broken into stagesLogical grouping
Cluster ManagerAllocates executor resources to Spark appsExternal (YARN/K8s/Standalone)

Cluster Managers

ManagerWhen to useKey flag
StandaloneSimple clusters, dev/test, full Spark control--master spark://host:7077
YARNExisting Hadoop clusters; shares resources with MapReduce/Hive--master yarn
KubernetesCloud-native; containerized workloads; GKE/EKS/AKS--master k8s://https://host:port
MesosRemoved in Spark 3.4. Was a general-purpose resource manager, replaced by K8sN/A
LocalSingle machine / unit tests--master local[*]

DAG Scheduler & Lazy Evaluation

Spark builds a Directed Acyclic Graph (DAG) of transformations but does nothing until an action is called. This is lazy evaluation:

Why lazy evaluation matters
Spark can see the full transformation graph before executing, allowing the optimizer to reorder, push down predicates, and eliminate unnecessary work — things a row-at-a-time system cannot do.
# PySpark — SparkSession entry point
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("MyApp")
    .master("local[*]")                        # local mode, all cores
    .config("spark.sql.shuffle.partitions", 8) # tune for local
    .getOrCreate())

sc = spark.sparkContext  # access underlying SparkContext
print(sc.defaultParallelism)  # number of cores available
// Scala — SparkSession entry point
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("MyApp")
  .master("local[*]")
  .config("spark.sql.shuffle.partitions", "8")
  .getOrCreate()

val sc = spark.sparkContext
println(sc.defaultParallelism)
Submit a job to a cluster
spark-submit --master yarn --deploy-mode cluster --executor-memory 4g --num-executors 10 my_job.py

Core Concepts — RDDs

RDD (Resilient Distributed Dataset) is Spark's foundational data structure — an immutable, fault-tolerant, distributed collection of records. DataFrames are built on top of RDDs but add schema and optimization. Use RDDs directly when you need fine-grained control or work with unstructured data.

Creating RDDs

# From a Python collection
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=4)

# From a text file (each line = one record)
rdd = sc.textFile("hdfs:///data/logs/*.txt")

# From a sequence file
rdd = sc.sequenceFile("hdfs:///data/kv")

# From another RDD (transformation)
rdd2 = rdd.map(lambda x: x * 2)
// From a collection
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), numSlices = 4)

// From a text file
val rdd = sc.textFile("hdfs:///data/logs/*.txt")

// From another RDD
val rdd2 = rdd.map(_ * 2)

Narrow vs. Wide Transformations

This distinction determines whether a shuffle occurs:

TypeShuffle?Stage boundary?Examples
NarrowNoNo — pipelined in one stagemap, filter, flatMap, mapPartitions, union
WideYesYes — new stage after shufflegroupByKey, reduceByKey, join, distinct, repartition, sortBy
Avoid groupByKey — prefer reduceByKey
groupByKey shuffles ALL values across the network before aggregating. reduceByKey does a map-side combine (partial aggregation) first, dramatically reducing shuffle data.

Common RDD Operations

# --- Transformations (lazy) ---
rdd.map(lambda x: x + 1)          # apply function to each element
rdd.flatMap(lambda x: x.split())  # map then flatten
rdd.filter(lambda x: x > 0)       # keep elements matching predicate
rdd.mapPartitions(lambda it: process(it))  # one call per partition (efficient)
rdd.distinct()                     # remove duplicates (shuffle)
rdd.sample(withReplacement=False, fraction=0.1, seed=42)

# Pair RDD transformations
pairs = rdd.map(lambda x: (x % 3, x))  # (key, value) pairs
pairs.reduceByKey(lambda a, b: a + b)   # aggregate by key (map-side combine)
pairs.groupByKey()                       # dangerous on large datasets — avoid
pairs.combineByKey(createCombiner, mergeValue, mergeCombiner)
pairs.sortByKey(ascending=True)
pairs.mapValues(lambda v: v * 2)

# Set operations
rdd1.union(rdd2)     # concatenate (no dedup)
rdd1.intersection(rdd2)  # common elements (shuffle)
rdd1.subtract(rdd2)  # elements in rdd1 not in rdd2 (shuffle)

# --- Actions (trigger execution) ---
rdd.collect()          # bring all data to driver — dangerous on large data!
rdd.count()            # number of elements
rdd.first()            # first element
rdd.take(10)           # first N elements
rdd.top(10)            # top N by natural order
rdd.reduce(lambda a, b: a + b)  # aggregate all elements
rdd.foreach(print)     # apply function on each element (on executors)
rdd.saveAsTextFile("hdfs:///output/")

Lineage Graph & Fault Tolerance

Spark tracks the lineage (sequence of transformations) for each RDD. If a partition is lost due to executor failure, Spark re-computes it from the parent RDD using the lineage — no data replication needed by default.

rdd.toDebugString()  # print lineage as ASCII tree

Partitions

Each RDD/DataFrame is divided into partitions — the unit of parallelism. One task processes one partition on one executor thread.

rdd.getNumPartitions()          # how many partitions?
rdd.repartition(100)            # reshuffle to exactly 100 partitions
rdd.coalesce(50)                # reduce partitions without full shuffle

# Custom partitioner (pair RDDs)
rdd.partitionBy(8, lambda key: hash(key) % 8)

# mapPartitions — avoids per-row overhead (e.g., DB connections)
def process_partition(iterator):
    conn = open_connection()
    for row in iterator:
        conn.write(row)
    conn.close()
    yield "done"

rdd.mapPartitions(process_partition)

DataFrames & Datasets

A DataFrame is a distributed table with named columns and a schema. It is Spark's primary API for structured data and the target of the Catalyst optimizer. A Dataset (Scala/Java only) adds compile-time type safety on top of DataFrames.

Schema Definition

from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType,
    LongType, DoubleType,
    BooleanType, TimestampType,
    ArrayType, MapType
)

schema = StructType([
    StructField("id",         LongType(),      nullable=False),
    StructField("name",       StringType(),    nullable=True),
    StructField("age",        IntegerType(),   nullable=True),
    StructField("score",      DoubleType(),    nullable=True),
    StructField("active",     BooleanType(),   nullable=True),
    StructField("created_at", TimestampType(), nullable=True),
    StructField("tags",       ArrayType(StringType()), nullable=True),
    StructField("metadata",   MapType(StringType(), StringType()), nullable=True),
])
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("id",         LongType,      nullable = false),
  StructField("name",       StringType,    nullable = true),
  StructField("age",        IntegerType,   nullable = true),
  StructField("score",      DoubleType,    nullable = true),
  StructField("active",     BooleanType,   nullable = true),
  StructField("created_at", TimestampType, nullable = true),
  StructField("tags",       ArrayType(StringType), nullable = true),
))

Creating DataFrames

# From a list of tuples
data = [(1, "Alice", 30), (2, "Bob", 25)]
df = spark.createDataFrame(data, ["id", "name", "age"])

# From list of dicts
data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
df = spark.createDataFrame(data)

# From RDD + explicit schema
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
df = spark.createDataFrame(rdd, schema)

# From CSV
df = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("nullValue", "NA")
    .csv("hdfs:///data/employees.csv"))

# From CSV with explicit schema (faster — skips inference pass)
df = spark.read.schema(schema).csv("hdfs:///data/employees.csv")

# From JSON (newline-delimited)
df = spark.read.json("hdfs:///data/events/")

# From Parquet (schema embedded — best format for Spark)
df = spark.read.parquet("hdfs:///data/warehouse/users/")

# From Delta Lake
df = spark.read.format("delta").load("s3://bucket/delta/users/")

# From JDBC
df = (spark.read.format("jdbc")
    .option("url", "jdbc:postgresql://host/db")
    .option("dbtable", "public.orders")
    .option("user", "user")
    .option("password", "pass")
    .option("numPartitions", 10)
    .option("partitionColumn", "id")
    .option("lowerBound", 1)
    .option("upperBound", 10000000)
    .load())
// From a Seq
val data = Seq((1, "Alice", 30), (2, "Bob", 25))
val df = data.toDF("id", "name", "age")

// From CSV
val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("hdfs:///data/employees.csv")

// With explicit schema
val df = spark.read
  .schema(schema)
  .csv("hdfs:///data/employees.csv")

// From Parquet
val df = spark.read.parquet("hdfs:///data/warehouse/users/")

// From JDBC
val df = spark.read.format("jdbc")
  .option("url", "jdbc:postgresql://host/db")
  .option("dbtable", "public.orders")
  .option("numPartitions", "10")
  .option("partitionColumn", "id")
  .option("lowerBound", "1")
  .option("upperBound", "10000000")
  .load()

DataFrame vs. Dataset

FeatureDataFrameDataset[T]
APIUntyped — columns resolved at runtimeTyped — compile-time type safety
LanguagePython, Scala, Java, RScala and Java only
PerformanceBest — Tungsten + Catalyst optimizationSlightly lower — encoder overhead for lambdas
Error detectionRuntimeCompile-time
Use whenSQL-like transformations, ML pipelinesDomain objects, type-safe functional pipelines
// Dataset — Scala only
case class Employee(id: Long, name: String, age: Int)

val ds: Dataset[Employee] = df.as[Employee]  // convert DataFrame to Dataset

// Type-safe operations
val seniors: Dataset[Employee] = ds.filter(_.age > 40)
val names: Dataset[String] = ds.map(_.name)

// Note: ds.filter(_.age > 40) uses JVM serialization for the lambda
// df.filter(col("age") > 40) stays in Tungsten — usually faster

Inspecting DataFrames

df.printSchema()          # tree view of schema with nullable flags
df.show(n=20, truncate=True)  # tabular preview (default 20 rows)
df.show(5, truncate=False)    # full column content
df.dtypes                  # list of (column_name, type_string) tuples
df.columns                 # list of column names
df.count()                 # number of rows (triggers a job)
df.describe("age", "score").show()  # count, mean, stddev, min, max

# Explain the physical plan
df.explain()               # physical plan only
df.explain(mode="extended") # logical + optimized logical + physical
df.explain(mode="formatted")  # structured plan output (Spark 3+)

Spark SQL

Spark SQL lets you run ANSI SQL against DataFrames. Under the hood it uses the same Catalyst optimizer as the DataFrame API — results are identical.

Temp Views & SQL Queries

# Register as temp view (session-scoped)
df.createOrReplaceTempView("employees")

# Register as global temp view (cross-session)
df.createOrReplaceGlobalTempView("employees_global")
# query with: spark.sql("SELECT * FROM global_temp.employees_global")

# Run SQL
result = spark.sql("""
    SELECT department,
           COUNT(*)           AS headcount,
           AVG(salary)        AS avg_salary,
           MAX(salary)        AS max_salary
    FROM employees
    WHERE active = true
    GROUP BY department
    HAVING COUNT(*) > 5
    ORDER BY avg_salary DESC
""")
result.show()

# SQL with subqueries
spark.sql("""
    SELECT e.name, e.salary, d.budget
    FROM employees e
    JOIN departments d ON e.department_id = d.id
    WHERE e.salary > (SELECT AVG(salary) FROM employees)
""").show()
df.createOrReplaceTempView("employees")

val result = spark.sql("""
  SELECT department,
         COUNT(*)      AS headcount,
         AVG(salary)   AS avg_salary
  FROM employees
  WHERE active = true
  GROUP BY department
  ORDER BY avg_salary DESC
""")
result.show()

Catalyst Optimizer

Catalyst is Spark SQL's query optimizer. It transforms your logical plan through four phases:

  1. Analysis — resolve column names and types against the catalog
  2. Logical Optimization — predicate pushdown, constant folding, column pruning, join reordering
  3. Physical Planning — select physical operators (broadcast vs. sort-merge join, etc.)
  4. Code Generation — whole-stage codegen via Janino compiles query to JVM bytecode
Predicate Pushdown
Filters applied on a DataFrame before a join or after reading from Parquet are pushed down to the data source level — only matching rows are loaded. This is automatic when reading Parquet/ORC/Delta. Explicit: df.filter("age > 30").join(...) — put filters before joins.

UDFs — User-Defined Functions

UDF Performance Warning
Python UDFs serialize rows to Python, execute, and serialize back to JVM. This breaks Catalyst optimization and Tungsten codegen. Use built-in functions (pyspark.sql.functions) whenever possible. If you must use a Python UDF, consider Pandas UDFs (vectorized) instead.
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StringType, LongType
import pandas as pd

# Regular UDF (slow — row-at-a-time Python serialization)
@udf(returnType=StringType())
def clean_name(name: str) -> str:
    if name is None:
        return None
    return name.strip().title()

df.withColumn("clean_name", clean_name("name")).show()

# Register for SQL use
spark.udf.register("clean_name_sql", clean_name)
spark.sql("SELECT clean_name_sql(name) FROM employees").show()

# Pandas UDF (vectorized — much faster for Python)
@pandas_udf(LongType())
def fast_hash(series: pd.Series) -> pd.Series:
    return series.apply(hash).abs() % (2**31)

df.withColumn("hash_id", fast_hash("name")).show()

# Grouped map with applyInPandas (plain function, NOT @pandas_udf)
def normalize_salary_by_dept(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf["salary_norm"] = (pdf["salary"] - pdf["salary"].mean()) / pdf["salary"].std()
    return pdf

result_schema = "department string, name string, salary double, salary_norm double"
df.groupby("department").applyInPandas(normalize_salary_by_dept, schema=result_schema)
import org.apache.spark.sql.functions.udf

// Scala UDF — stays in JVM, minimal overhead
val cleanName = udf((name: String) =>
  Option(name).map(_.trim.toLowerCase.capitalize).orNull
)

df.withColumn("clean_name", cleanName(col("name"))).show()

// Register for SQL
spark.udf.register("clean_name_sql", cleanName)
spark.sql("SELECT clean_name_sql(name) FROM employees").show()

Broadcast Variables

# Broadcast a lookup table to all executors (avoids shipping it per task)
country_map = {"US": "United States", "GB": "Great Britain", "DE": "Germany"}
bc_map = sc.broadcast(country_map)

# Use in RDD transformation
result = rdd.map(lambda code: bc_map.value.get(code, "Unknown"))

# In DataFrame context, use broadcast join (covered in Joins section)
from pyspark.sql.functions import broadcast
df.join(broadcast(small_df), "country_code")

# Clean up when done
bc_map.unpersist()
bc_map.destroy()  # removes from executor memory entirely

Transformations

Select, Filter, WithColumn

from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, when, coalesce

# Select columns
df.select("id", "name", "salary")
df.select(col("id"), col("name"), (col("salary") * 1.1).alias("new_salary"))
df.select("*", F.upper("name").alias("name_upper"))

# Filter / where (identical)
df.filter(col("age") > 30)
df.where("salary > 50000 AND active = true")
df.filter((col("department") == "Engineering") & col("active"))

# withColumn — add or replace a column
df = df.withColumn("bonus", col("salary") * 0.1)
df = df.withColumn("level",
    when(col("salary") > 200000, "L7")
    .when(col("salary") > 150000, "L6")
    .when(col("salary") > 100000, "L5")
    .otherwise("L4"))

# withColumnRenamed
df = df.withColumnRenamed("old_col", "new_col")

# drop columns
df = df.drop("temp_col", "another_col")

# coalesce — first non-null value
df = df.withColumn("display_name", coalesce(col("nickname"), col("name"), lit("Unknown")))

# Cast types
df = df.withColumn("age", col("age").cast("integer"))
df = df.withColumn("created_at", col("created_at").cast("timestamp"))
import org.apache.spark.sql.functions._

// Select
df.select("id", "name", "salary")
df.select($"id", $"name", ($"salary" * 1.1).as("new_salary"))

// Filter
df.filter($"age" > 30)
df.where("salary > 50000 AND active = true")

// withColumn
val df2 = df
  .withColumn("bonus", $"salary" * 0.1)
  .withColumn("level",
    when($"salary" > 200000, "L7")
    .when($"salary" > 150000, "L6")
    .otherwise("L4"))

// Drop, rename, cast
df.drop("temp_col")
  .withColumnRenamed("old_col", "new_col")
  .withColumn("age", $"age".cast("integer"))

GroupBy & Aggregation

from pyspark.sql import functions as F

# Single groupBy
df.groupBy("department").agg(
    F.count("*").alias("headcount"),
    F.avg("salary").alias("avg_salary"),
    F.max("salary").alias("max_salary"),
    F.min("salary").alias("min_salary"),
    F.sum("salary").alias("total_payroll"),
    F.stddev("salary").alias("salary_stddev"),
    F.collect_list("name").alias("names"),   # order not guaranteed
    F.collect_set("skill").alias("skills"),  # distinct values
    F.countDistinct("manager_id").alias("unique_managers"),
)

# Multi-column groupBy
df.groupBy("department", "level").count()

# Rollup and Cube for subtotals
df.rollup("department", "level").agg(F.count("*")).show()
df.cube("department", "level").agg(F.avg("salary")).show()

# groupBy + filter (HAVING equivalent)
df.groupBy("department") \
  .agg(F.count("*").alias("cnt")) \
  .filter(col("cnt") > 10) \
  .show()
df.groupBy("department").agg(
  count("*").as("headcount"),
  avg("salary").as("avg_salary"),
  max("salary").as("max_salary"),
  sum("salary").as("total_payroll"),
  collect_list("name").as("names"),
  countDistinct("manager_id").as("unique_managers")
)

// Rollup
df.rollup("department", "level")
  .agg(count("*"))
  .show()

Window Functions

Window functions compute values across a sliding window of rows relative to the current row — without collapsing rows like groupBy.

from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Define window spec
w_dept = (Window
    .partitionBy("department")
    .orderBy(F.desc("salary")))

# Ranking functions
df = df.withColumn("rank",        F.rank().over(w_dept))
df = df.withColumn("dense_rank",  F.dense_rank().over(w_dept))
df = df.withColumn("row_number",  F.row_number().over(w_dept))
df = df.withColumn("percent_rank",F.percent_rank().over(w_dept))
df = df.withColumn("ntile_4",     F.ntile(4).over(w_dept))

# Lag / Lead — access previous/next row values
df = df.withColumn("prev_salary", F.lag("salary", 1).over(w_dept))
df = df.withColumn("next_salary", F.lead("salary", 1).over(w_dept))
df = df.withColumn("salary_diff", col("salary") - F.lag("salary", 1).over(w_dept))

# Running totals with frame specification
w_running = (Window
    .partitionBy("department")
    .orderBy("hire_date")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow))

df = df.withColumn("running_total", F.sum("salary").over(w_running))

# Moving average (3-row centered window)
w_moving = (Window
    .partitionBy("department")
    .orderBy("month")
    .rowsBetween(-1, 1))  # one before, current, one after

df = df.withColumn("moving_avg", F.avg("salary").over(w_moving))

# First / last value in window
w_all = Window.partitionBy("department").orderBy("hire_date") \
              .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = df.withColumn("dept_first_hire", F.first("name").over(w_all))
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val wDept = Window
  .partitionBy("department")
  .orderBy(desc("salary"))

val df2 = df
  .withColumn("rank",       rank().over(wDept))
  .withColumn("dense_rank", dense_rank().over(wDept))
  .withColumn("row_number", row_number().over(wDept))
  .withColumn("prev_salary", lag("salary", 1).over(wDept))

// Running total
val wRunning = Window
  .partitionBy("department")
  .orderBy("hire_date")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

val df3 = df2.withColumn("running_total", sum("salary").over(wRunning))

Pivot & Unpivot

# Pivot — rows to columns
pivot_df = (df
    .groupBy("employee_id")
    .pivot("quarter", ["Q1", "Q2", "Q3", "Q4"])  # explicit values = faster
    .agg(F.sum("revenue")))

# Without explicit values (Spark scans data first to find distinct values — slower)
pivot_df = df.groupBy("employee_id").pivot("quarter").agg(F.sum("revenue"))

# Unpivot (stack) — columns to rows — Spark 3.4+ native, else use selectExpr
from pyspark.sql.functions import expr

# Native stack function
unpivot_df = df.select(
    "employee_id",
    F.expr("stack(4, 'Q1', Q1, 'Q2', Q2, 'Q3', Q3, 'Q4', Q4) AS (quarter, revenue)")
)

# Spark 3.4+ unpivot
unpivot_df = df.unpivot(["employee_id"], ["Q1", "Q2", "Q3", "Q4"], "quarter", "revenue")

Actions & Writing Data

Common Actions

df.count()                # triggers full scan — can be expensive
df.collect()              # returns list of Row objects to driver — OOM risk!
df.first()                # first row
df.head(5)                # first 5 rows (same as take(5))
df.take(10)               # list of first 10 Row objects
df.show(20, truncate=50)  # print tabular (for interactive exploration)
df.toPandas()             # converts to Pandas DataFrame — driver memory!

# Safer alternatives to collect()
df.limit(1000).collect()  # cap before pulling to driver
df.sample(fraction=0.01).toPandas()  # sample first

# Aggregation actions (compute one value)
df.agg(F.sum("salary")).collect()[0][0]
Never collect() on large DataFrames
collect() and toPandas() bring ALL data to the driver JVM. On a 100 GB dataset this causes OOM and kills the driver. Use limit() or write() to files instead.

Writing DataFrames

# Save modes: append | overwrite | error (default) | ignore
df.write.mode("overwrite").parquet("hdfs:///output/users/")

# Parquet (recommended default — columnar, compressed, schema embedded)
df.write \
  .mode("overwrite") \
  .option("compression", "snappy") \
  .parquet("s3://bucket/users/")

# CSV
df.write \
  .mode("append") \
  .option("header", "true") \
  .option("delimiter", ",") \
  .csv("hdfs:///output/report/")

# JSON (newline-delimited)
df.write.mode("overwrite").json("hdfs:///output/events/")

# Delta Lake (ACID, time-travel, schema enforcement)
df.write.format("delta").mode("overwrite").save("s3://bucket/delta/users/")

# Partition by a column (creates directory structure: year=2024/month=01/...)
df.write \
  .partitionBy("year", "month") \
  .mode("overwrite") \
  .parquet("hdfs:///output/events/")

# Bucket for joins (write once, join many times efficiently)
df.write \
  .bucketBy(64, "user_id") \
  .sortBy("user_id") \
  .mode("overwrite") \
  .saveAsTable("users_bucketed")

# JDBC write
df.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://host/db") \
  .option("dbtable", "output_table") \
  .option("user", "user") \
  .option("password", "pass") \
  .mode("append") \
  .save()
// Parquet
df.write
  .mode("overwrite")
  .option("compression", "snappy")
  .parquet("s3://bucket/users/")

// Partitioned write
df.write
  .partitionBy("year", "month")
  .mode("overwrite")
  .parquet("hdfs:///output/events/")

// Delta
df.write
  .format("delta")
  .mode("overwrite")
  .save("s3://bucket/delta/users/")

// Bucketed table
df.write
  .bucketBy(64, "user_id")
  .sortBy("user_id")
  .saveAsTable("users_bucketed")

Save Modes

ModeIf data existsUse when
error (default)Throws exceptionSafe default, prevents accidental overwrites
overwriteDeletes existing, writes newFull refresh of a table/path
appendAdds new data alongside existingIncremental loads, streaming sinks
ignoreSilently skips writeIdempotent writes — skip if already done

Joins

Join Types

# Basic join syntax
df_orders.join(df_users, on="user_id", how="inner")

# Explicit condition (handles column name collision)
df_orders.join(df_users,
    df_orders["user_id"] == df_users["id"],
    how="left")

# Multiple join keys
df.join(other, on=["dept_id", "year"], how="inner")

# Join types
how = "inner"    # rows in both (default)
how = "left"     # all rows from left, nulls where no match in right
how = "right"    # all rows from right
how = "full"     # all rows from both (outer)
how = "cross"    # cartesian product — DANGEROUS on large tables
how = "semi"     # rows from left that HAVE a match in right (no right cols)
how = "anti"     # rows from left that DO NOT match right

# Semi-join example — "which orders have a matching product?"
orders_with_product = df_orders.join(df_products, "product_id", "semi")

# Anti-join example — "which users have never placed an order?"
users_no_orders = df_users.join(df_orders, "user_id", "anti")
// Basic joins
dfOrders.join(dfUsers, "user_id")             // inner
dfOrders.join(dfUsers, "user_id", "left")     // left outer
dfOrders.join(dfUsers, "user_id", "semi")     // left semi
dfOrders.join(dfUsers, "user_id", "anti")     // left anti

// Explicit condition
dfOrders.join(dfUsers,
  dfOrders("user_id") === dfUsers("id"),
  "inner")

// Cross join (must be explicit in Spark 3.x)
df1.crossJoin(df2)

Join Strategies

StrategyWhen Spark uses itShuffle?Best for
Broadcast Hash JoinSmall table < broadcast threshold (10 MB default)NoLarge table × small lookup table
Shuffle Hash JoinMedium-sized tables, hash fits in memoryYesNon-sortable keys, unequal sizes
Sort-Merge JoinLarge + large table joins (default for most joins)YesBoth sides large, equi-joins
Cartesian / Nested LoopNo join key (cross join or non-equi join)YesAvoid — O(n*m) complexity

Broadcast Join

from pyspark.sql.functions import broadcast

# Force broadcast on the small side
result = df_large.join(broadcast(df_small), "key_col")

# Tune threshold (bytes) — default 10MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)  # 50 MB

# Disable auto-broadcast (force sort-merge for reproducibility)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

# SQL hint
spark.sql("""
    SELECT /*+ BROADCAST(d) */ e.*, d.name AS dept_name
    FROM employees e
    JOIN departments d ON e.dept_id = d.id
""")

Data Skew in Joins

Data Skew
Skew happens when some keys have many more rows than others (e.g., a "NULL" key or a single massive customer). One executor gets overloaded while others sit idle. You'll see one task in a stage taking 100x longer.
# Diagnose: check value distribution
df.groupBy("join_key").count().orderBy(F.desc("count")).show(20)

# Solution 1: Salting — break hot keys into sub-keys
import random
SALT_FACTOR = 50

# Salt the left (large) side
df_large_salted = df_large.withColumn(
    "salted_key",
    F.concat(col("join_key"), F.lit("_"), (F.rand() * SALT_FACTOR).cast("int"))
)

# Explode the right (small/dimension) side
from pyspark.sql.functions import array, explode

df_small_exploded = df_small.withColumn("salt", F.array([F.lit(i) for i in range(SALT_FACTOR)])) \
    .withColumn("salt", F.explode("salt")) \
    .withColumn("salted_key", F.concat(col("join_key"), F.lit("_"), col("salt")))

result = df_large_salted.join(df_small_exploded, "salted_key").drop("salted_key", "salt")

# Solution 2: Skew Join Hint (Spark 3.0+ AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

# Solution 3: Broadcast the skewed dimension (if it fits)
result = df_large.join(broadcast(df_small), "join_key")

Shuffles & Partitioning

Repartition vs. Coalesce

repartition(n)coalesce(n)
ShuffleYes — full shuffleNo (usually) — moves data locally where possible
DirectionUp or downDown only (reduces partitions)
Data distributionEven distributionCan create uneven partitions
Use whenNeed even partitions for compute; before wide transformationsReduce small files before writing
df.repartition(200)                          # by count, round-robin
df.repartition(200, col("department"))       # by count + column (hash partitioned)
df.repartition(col("year"), col("month"))    # by columns only

df.coalesce(10)   # reduce to 10 partitions, minimal shuffle

# Check partition count
df.rdd.getNumPartitions()

# Inspect partition sizes
df.rdd.mapPartitionsWithIndex(
    lambda idx, it: [(idx, sum(1 for _ in it))]
).toDF(["partition_id", "count"]).orderBy("partition_id").show()

# Default parallelism — controls initial partition count for operations
print(sc.defaultParallelism)                    # usually = total executor cores
spark.conf.set("spark.default.parallelism", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200)  # for DataFrame shuffles

Partition Pruning on Reads

# Data written with partitionBy creates directory structure:
# s3://bucket/events/year=2024/month=01/day=15/*.parquet

# When reading, Spark prunes directories that don't match the filter
df = spark.read.parquet("s3://bucket/events/")
df_jan = df.filter("year = 2024 AND month = 1")  # only reads year=2024/month=01/ dirs

# Partition column values are inferred from directory names
# They do NOT appear in the actual Parquet files — just the directory path

Bucketing

# Write bucketed table (requires Hive metastore / spark catalog)
df.write \
  .bucketBy(64, "user_id") \
  .sortBy("user_id") \
  .saveAsTable("events_bucketed")

# Bucketing eliminates shuffle when joining two bucketed tables
# on the same key with the same number of buckets
df_users.write.bucketBy(64, "user_id").saveAsTable("users_bucketed")
df_events.write.bucketBy(64, "user_id").saveAsTable("events_bucketed")

# This join has NO shuffle — data is pre-sorted and co-located
result = spark.table("users_bucketed").join(spark.table("events_bucketed"), "user_id")

Adaptive Query Execution (AQE)

# AQE (Spark 3.0+) — dynamically optimizes query plans at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true")            # on by default in 3.2+

# AQE features:
# 1. Coalesce shuffle partitions: reduces 200 -> fewer if data is small
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")

# 2. Convert sort-merge join to broadcast join at runtime
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

# 3. Skew join handling (see Joins section)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

Caching & Persistence

Storage Levels

LevelMemoryDiskSerializedUse when
MEMORY_ONLYYesNoNo (JVM objects)Small dataset, fast JVM; recompute if evicted
MEMORY_AND_DISKYesOverflowNo/YesPrefer memory, spill to disk if needed — good default
MEMORY_ONLY_SERYesNoYes (Kryo)More space-efficient, slower to read
MEMORY_AND_DISK_SERYesOverflowYesLarge datasets, worth serialization overhead
DISK_ONLYNoYesYesVery large data, recompute is expensive
OFF_HEAPOff-heapNoYesAvoid GC pressure; requires spark.memory.offHeap.enabled
from pyspark import StorageLevel

# cache() = MEMORY_AND_DISK for DataFrames/Datasets; MEMORY_ONLY for RDDs
df_hot.cache()

# persist() with explicit level
df_hot.persist(StorageLevel.MEMORY_AND_DISK)
df_hot.persist(StorageLevel.MEMORY_ONLY_SER)
df_hot.persist(StorageLevel.DISK_ONLY)

# Cache is lazy — call an action to materialize
df_hot.count()  # triggers actual caching

# Check if cached
print(df_hot.is_cached)   # DataFrame
print(rdd.is_cached)      # RDD

# Release cache
df_hot.unpersist()         # blocks until complete
df_hot.unpersist(blocking=True)
import org.apache.spark.storage.StorageLevel

df.cache()   // MEMORY_AND_DISK

df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY_SER)

// Materialize
df.count()

// Release
df.unpersist()
df.unpersist(blocking = true)

When to Cache

Cache Pitfalls
Caching unused DataFrames wastes executor memory and can cause evictions of other cached data. Always unpersist() when done. Avoid caching very wide DataFrames — cache at the final column selection.

Checkpoint

# Checkpoint — materializes to disk AND breaks the lineage graph
# Useful when lineage grows too long (iterative algorithms, streaming)
sc.setCheckpointDir("hdfs:///checkpoints/")

df_processed = df.filter(...).join(...).groupBy(...)
df_processed = df_processed.checkpoint()  # must capture return value! Truncates lineage.

# Eager vs lazy checkpoint
df_processed = df_processed.checkpoint(eager=True)   # triggers action immediately (default)
df_processed.checkpoint(eager=False)  # lazy — materializes on next action

# Local checkpoint — faster but less reliable (no HDFS replication)
df_processed.localCheckpoint()

Structured Streaming

Structured Streaming treats a live data stream as an unbounded table that grows over time. You write the same DataFrame API queries, and Spark runs them incrementally.

Reading a Stream

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType

# From Kafka
kafka_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "earliest")   # or "latest"
    .option("maxOffsetsPerTrigger", 100000)  # rate limit
    .load())

# Kafka gives: key, value, topic, partition, offset, timestamp, timestampType
# Decode the value as JSON
schema = StructType([
    StructField("user_id",    LongType(),      True),
    StructField("event_type", StringType(),    True),
    StructField("ts",         TimestampType(), True),
])

events_df = kafka_df.select(
    F.from_json(col("value").cast("string"), schema).alias("data"),
    col("timestamp").alias("kafka_ts")
).select("data.*", "kafka_ts")

# From files (continuous directory scan)
file_df = (spark.readStream
    .format("parquet")
    .schema(schema)
    .option("path", "s3://bucket/landing/")
    .load())

# From socket (dev/testing only)
socket_df = (spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load())

Writing a Stream

# Output modes:
# append   — only new rows (for stateless queries or with watermark)
# complete — entire result table (for aggregations without watermark)
# update   — only changed rows

# Write to console (dev)
query = (events_df.writeStream
    .format("console")
    .outputMode("append")
    .trigger(processingTime="10 seconds")  # micro-batch every 10s
    .start())

# Write to Parquet (files)
query = (events_df.writeStream
    .format("parquet")
    .outputMode("append")
    .option("path", "s3://bucket/events-processed/")
    .option("checkpointLocation", "s3://bucket/checkpoints/events/")
    .partitionBy("year", "month", "day")
    .trigger(processingTime="1 minute")
    .start())

# Write to Kafka
query = (result_df.selectExpr("CAST(user_id AS STRING) AS key", "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("topic", "output-events")
    .option("checkpointLocation", "s3://bucket/checkpoints/output/")
    .outputMode("append")
    .start())

# Trigger modes
.trigger(processingTime="30 seconds")  # fixed interval micro-batch
.trigger(once=True)                    # process all available data, then stop
.trigger(availableNow=True)            # like once=True but respects maxFilesPerTrigger
.trigger(continuous="1 second")        # experimental — truly continuous (low latency)

# Wait for termination
query.awaitTermination()
query.stop()

# Monitor
print(query.status)
print(query.lastProgress)
query.explain()

Watermarks & Late Data

# Watermark tells Spark how late data can arrive before it's dropped
# Required for stateful aggregations with append output mode

from pyspark.sql import functions as F

windowed = (events_df
    .withWatermark("ts", "10 minutes")       # tolerate 10 min late data
    .groupBy(
        F.window("ts", "5 minutes"),          # 5-min tumbling window
        col("event_type")
    )
    .count())

# Sliding windows
sliding = (events_df
    .withWatermark("ts", "10 minutes")
    .groupBy(
        F.window("ts", "10 minutes", "5 minutes"),  # 10-min window, 5-min slide
        col("event_type")
    )
    .count())

# Session windows (Spark 3.2+)
session = (events_df
    .withWatermark("ts", "10 minutes")
    .groupBy(
        F.session_window("ts", "5 minutes"),  # gap timeout = 5 min
        col("user_id")
    )
    .count())

foreachBatch — Custom Sink Logic

def process_batch(batch_df, batch_id):
    """Called for each micro-batch. batch_df is a regular DataFrame."""
    # Deduplicate
    batch_df = batch_df.dropDuplicates(["event_id"])

    # Write to multiple destinations
    batch_df.write.mode("append").parquet("s3://bucket/raw/")
    batch_df.filter(col("priority") == "high") \
            .write.format("delta").mode("append").save("s3://bucket/high-priority/")

    # Call external API, write to DB, etc.
    batch_df.toPandas().to_sql("events", engine, if_exists="append")

query = (events_df.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", "s3://bucket/checkpoints/")
    .trigger(processingTime="30 seconds")
    .start())

Performance Tuning

General Principles

Executor Sizing

Executor Sizing Rule of Thumb
5 cores per executor is a common sweet spot — enough parallelism, manageable HDFS client threads. Leave 1 core per node for OS/Hadoop daemons. Memory: executor_memory = (node_memory - OS overhead) / executors_per_node. Reserve ~10% overhead for Spark's off-heap.
# YARN cluster: 10 nodes, each 64 GB RAM + 16 cores
# Leave 1 core for OS, 4 GB for OS = 15 cores, 60 GB per node
# 3 executors per node × 5 cores = 15 cores/node (keeps 1 for AM)
# Memory per executor: 60 / 3 = 20 GB → allocate 18 GB + 2 GB overhead

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 30 \
  --executor-cores 5 \
  --executor-memory 18g \
  --driver-memory 4g \
  --conf spark.yarn.executor.memoryOverhead=2g \
  my_job.py

Serialization — Kryo

# Default Java serialization is slow. Kryo is 2-10x faster.
# Important for RDD operations and UDFs, less so for DataFrames (Tungsten handles it)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")  # log warnings, not errors

# In Scala — register your classes for best Kryo performance
// Register case classes with Kryo
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(
  classOf[Employee],
  classOf[Order]
))

Data Skew Solutions Summary

TechniqueWhen to useTrade-off
Broadcast joinOne side < 50–100 MBDriver OOM risk for very large small side
SaltingSkewed join key (hot keys)Must explode dimension side; extra disk/CPU
AQE skew joinSpark 3+ with AQE enabledAuto-detected; may not catch all skew
RepartitionUneven input partitionsCauses a full shuffle
Filter skewed keysHot keys are NULLs or garbageData loss if not careful

Small File Problem

# Problem: streaming jobs or many small partitioned writes create thousands of tiny files
# Reading 10,000 × 1 MB files is much slower than reading 100 × 100 MB files
# (HDFS NameNode pressure, S3 LIST API calls, per-file overhead)

# Solution 1: coalesce before write
df.coalesce(50).write.parquet("s3://bucket/output/")

# Solution 2: repartition by partition columns to get one file per partition value
df.repartition("year", "month").write.partitionBy("year", "month").parquet("output/")

# Solution 3: Spark SQL maxRecordsPerFile
df.write \
  .option("maxRecordsPerFile", 1000000) \
  .parquet("output/")

# Solution 4: Delta OPTIMIZE command (Delta Lake)
spark.sql("OPTIMIZE delta.`s3://bucket/delta/events/` ZORDER BY (user_id, ts)")

# Solution 5: Auto-compaction in streaming
query = df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "...") \
    .option("delta.autoOptimize.autoCompact", "true") \
    .start("s3://bucket/delta/events/")

AQE — Adaptive Query Execution

# AQE is the most impactful Spark 3.x performance feature
# It re-optimizes physical plans at runtime based on actual shuffle statistics

spark.conf.set("spark.sql.adaptive.enabled", "true")  # default true in Spark 3.2+

# 1. Dynamic partition coalescing: merges small shuffle partitions after the shuffle
#    Avoids 200 tiny tasks when shuffle output is small
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")

# 2. Dynamic join strategy switching: if statistics show one side is small,
#    AQE downgrades sort-merge to broadcast join
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

# 3. Skew join optimization
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

Production Deployment

Deployment Platforms Comparison

PlatformCluster ManagerBest ForTrade-off
EMRYARNAWS-native data teams, S3-heavy pipelinesVendor lock-in, cluster spin-up time (~5-10 min)
DatabricksProprietaryFastest setup, notebooks + Unity Catalog, Delta LakeMost expensive, less infra control
KubernetesK8s nativeMulti-cloud, existing K8s infra, containerized workloadsMore complex ops, tuning pod resources
DataprocYARNGCP-native, GCS integrationGCP lock-in
YARN (on-prem)YARNExisting Hadoop clustersHardware management overhead
StandaloneBuilt-inDev/test, simple single-team clustersNo resource sharing, limited scheduling

Complete Example: S3 Aggregation Job

A dead-simple end-to-end Spark app: read orders from S3, compute revenue per product, write results back to S3 as Parquet.

# etl_job.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
    .appName("S3 Revenue Aggregation")
    .getOrCreate())

# Read — CSV with header from S3
orders = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("s3a://my-bucket/raw/orders/"))

# Transform — revenue per product
revenue = (orders
    .groupBy("product_id")
    .agg(
        F.sum("amount").alias("total_revenue"),
        F.count("*").alias("order_count"),
        F.avg("amount").alias("avg_order_value"),
    ))

# Write — Parquet back to S3
(revenue.write
    .mode("overwrite")
    .parquet("s3a://my-bucket/output/revenue_by_product/"))

spark.stop()

# Submit:
# spark-submit --master yarn --deploy-mode cluster etl_job.py
// src/main/scala/com/example/EtlJob.scala
package com.example

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("S3 Revenue Aggregation")
      .getOrCreate()

    // Read — CSV with header from S3
    val orders = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("s3a://my-bucket/raw/orders/")

    // Transform — revenue per product
    val revenue = orders
      .groupBy("product_id")
      .agg(
        sum("amount").as("total_revenue"),
        count("*").as("order_count"),
        avg("amount").as("avg_order_value")
      )

    // Write — Parquet back to S3
    revenue.write
      .mode("overwrite")
      .parquet("s3a://my-bucket/output/revenue_by_product/")

    spark.stop()
  }
}

// build.sbt:
// name := "spark-etl"
// version := "1.0"
// scalaVersion := "2.12.18"
// libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided"
//
// Build:  sbt assembly
// Submit: spark-submit --class com.example.EtlJob target/scala-2.12/spark-etl-assembly-1.0.jar
S3 Access
Use s3a:// (not s3:// or s3n://) — it's the modern, production connector. Credentials come from IAM roles (EMR/Glue/Databricks), environment variables (AWS_ACCESS_KEY_ID), or spark.hadoop.fs.s3a.* configs. Never hardcode keys in your code.

Packaging a Spark Application

Packaging Rule
Your application JAR (or Python zip) must include your code + third-party deps. Spark's own JARs are already on the cluster — never bundle them (use provided scope).

Python (PySpark)

# Option 1: Single script (simplest)
spark-submit my_job.py

# Option 2: Multiple modules — zip them
cd my_project/
zip -r ../my_project.zip .   # zip the package directory
spark-submit --py-files my_project.zip main.py

# Option 3: Third-party deps — ship a venv or use --packages
pip install -t deps/ requests pandas
cd deps/ && zip -r ../deps.zip . && cd ..
spark-submit --py-files deps.zip,my_project.zip main.py

# Option 4: Conda/venv environment archive (EMR/Databricks)
# Pack the whole environment for reproducibility
conda pack -o pyspark_env.tar.gz
spark-submit \
  --conf spark.pyspark.python=./environment/bin/python \
  --archives pyspark_env.tar.gz#environment \
  main.py

Java / Scala

# Build an uber-jar (fat jar) with all deps except Spark itself
# Maven: use shade plugin    Gradle: use shadow plugin    sbt: use assembly

# Maven pom.xml key snippet:
#   
#     org.apache.spark
#     spark-sql_2.12
#     provided    
#   

mvn clean package -DskipTests
# produces target/my-spark-app-1.0-shaded.jar

spark-submit \
  --class com.example.MyApp \
  --master yarn \
  target/my-spark-app-1.0-shaded.jar arg1 arg2

spark-submit Deep Dive

# Full spark-submit anatomy
spark-submit \
  --master yarn \                          # cluster manager: yarn | k8s://... | spark://... | local[*]
  --deploy-mode cluster \                  # cluster = driver on worker; client = driver on submitting machine
  --name "daily-etl" \                     # app name visible in Spark UI / YARN RM
  --queue production \                     # YARN queue (resource isolation)
  --num-executors 20 \                     # fixed number (or use dynamic allocation)
  --executor-cores 5 \
  --executor-memory 16g \
  --driver-memory 4g \
  --driver-cores 2 \
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.minExecutors=5 \
  --conf spark.dynamicAllocation.maxExecutors=50 \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --packages org.apache.spark:spark-avro_2.12:3.5.0 \  # Maven coords — downloaded at runtime
  --jars /path/to/custom.jar \             # local JARs to ship
  --py-files deps.zip \                    # Python: extra .zip/.py/.egg
  --files config.yaml \                    # files accessible via SparkFiles.get("config.yaml")
  my_job.py   # or my-app.jar --class com.example.Main
deploy-mode: client vs cluster
client — driver runs on the submitting machine. Good for interactive debugging, but if your laptop disconnects the job dies.
cluster — driver runs inside the cluster (YARN ApplicationMaster / K8s pod). Use this for scheduled production jobs. Logs accessible via yarn logs -applicationId <id> or K8s pod logs.

EMR (AWS)

# Create a transient EMR cluster that runs one step then terminates
aws emr create-cluster \
  --name "nightly-etl" \
  --release-label emr-7.0.0 \
  --applications Name=Spark \
  --instance-groups '[
    {"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","InstanceCount":1},
    {"InstanceGroupType":"CORE","InstanceType":"m5.2xlarge","InstanceCount":5}
  ]' \
  --steps '[{
    "Type":"Spark",
    "Name":"ETL",
    "ActionOnFailure":"TERMINATE_CLUSTER",
    "Args":["--deploy-mode","cluster","--py-files","s3://my-bucket/deps.zip","s3://my-bucket/jobs/etl.py"]
  }]' \
  --auto-terminate \
  --log-uri s3://my-bucket/emr-logs/ \
  --service-role EMR_DefaultRole \
  --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-abc123
# In your PySpark code — read/write directly to S3
df = spark.read.parquet("s3://my-bucket/raw/events/")
df.write.mode("overwrite").parquet("s3://my-bucket/processed/events/")

# EMR pre-configures the S3 committer for fast, safe writes
# No need to set fs.s3a.* — EMRFS handles it
EMR Cost Tips
Use Spot instances for CORE/TASK nodes (60-80% savings). Use transient clusters (create → run → terminate) for batch jobs instead of long-running clusters. Use instance fleets to let EMR pick the cheapest available instance types.

Kubernetes

# Native Spark on K8s (Spark 3.1+) — no Hadoop/YARN needed
spark-submit \
  --master k8s://https://k8s-api-server:6443 \
  --deploy-mode cluster \
  --name spark-etl \
  --conf spark.kubernetes.container.image=my-registry/spark:3.5.0 \
  --conf spark.kubernetes.namespace=spark-jobs \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.driver.request.cores=2 \
  --conf spark.kubernetes.driver.limit.cores=2 \
  --conf spark.kubernetes.executor.request.cores=4 \
  --conf spark.kubernetes.executor.limit.cores=4 \
  --conf spark.executor.instances=10 \
  --conf spark.executor.memory=8g \
  --conf spark.kubernetes.file.upload.path=s3a://my-bucket/spark-uploads/ \
  local:///opt/spark/work-dir/my_job.py    # path INSIDE the container image
# Build a custom Spark image with your code + deps baked in
# Dockerfile
FROM apache/spark:3.5.0
COPY my_project/ /opt/spark/work-dir/
COPY requirements.txt /opt/spark/work-dir/
RUN pip install -r /opt/spark/work-dir/requirements.txt

# Build and push
docker build -t my-registry/spark-etl:v1 .
docker push my-registry/spark-etl:v1
Spark Operator (recommended for K8s)
The Kubeflow Spark Operator provides a SparkApplication CRD — define jobs declaratively in YAML, get automatic retries, monitoring, and lifecycle management instead of raw spark-submit.
# SparkApplication CRD example
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: daily-etl
  namespace: spark-jobs
spec:
  type: Python
  mode: cluster
  image: my-registry/spark-etl:v1
  mainApplicationFile: local:///opt/spark/work-dir/etl.py
  sparkVersion: "3.5.0"
  driver:
    cores: 2
    memory: "4g"
    serviceAccount: spark
  executor:
    cores: 4
    instances: 10
    memory: "8g"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3

Databricks

# Databricks CLI — create and run a job
# Install: pip install databricks-cli

# Configure
databricks configure --token   # enter host + personal access token

# Deploy a wheel/jar to DBFS
databricks fs cp my_project-0.1-py3-none-any.whl dbfs:/libs/

# Create a job definition (JSON)
databricks jobs create --json '{
  "name": "daily-etl",
  "new_cluster": {
    "spark_version": "14.3.x-scala2.12",
    "node_type_id": "i3.xlarge",
    "num_workers": 5,
    "aws_attributes": {"availability": "SPOT_WITH_FALLBACK"}
  },
  "libraries": [{"whl": "dbfs:/libs/my_project-0.1-py3-none-any.whl"}],
  "spark_python_task": {
    "python_file": "dbfs:/jobs/etl.py",
    "parameters": ["--date", "2024-01-01"]
  }
}'

# Trigger a run
databricks jobs run-now --job-id 12345
Databricks Workflows vs Jobs
Jobs = single task. Workflows = DAG of tasks with dependencies — Databricks' built-in orchestrator. For complex pipelines, consider Workflows or an external scheduler (Airflow, Prefect) that triggers Databricks jobs via the REST API.

Orchestrating Spark Jobs

# Airflow — the most common Spark job scheduler
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator

# Option 1: SparkSubmitOperator (for YARN / K8s / standalone)
etl_task = SparkSubmitOperator(
    task_id="run_etl",
    application="/opt/jobs/etl.py",
    conn_id="spark_default",
    conf={"spark.executor.memory": "8g", "spark.executor.cores": "4"},
    num_executors=10,
    py_files="/opt/jobs/deps.zip",
)

# Option 2: EMR step via Airflow
emr_step = EmrAddStepsOperator(
    task_id="add_etl_step",
    job_flow_id="{{ var.value.emr_cluster_id }}",
    steps=[{
        "Name": "ETL",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["spark-submit", "--deploy-mode", "cluster",
                     "s3://bucket/jobs/etl.py"]
        }
    }],
)

# Option 3: Databricks via Airflow
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
db_task = DatabricksRunNowOperator(
    task_id="run_databricks_etl",
    job_id=12345,
    notebook_params={"date": "{{ ds }}"},
)

CI/CD for Spark Jobs

# GitHub Actions example: test → build → deploy
name: spark-etl-pipeline
on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.11" }
      - run: |
          pip install pyspark==3.5.0 pytest
          pip install -r requirements.txt
          pytest tests/ -v           # unit tests with local Spark

  deploy:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: |
          # Package
          cd my_project && zip -r ../my_project.zip . && cd ..
          pip install -t deps/ -r requirements.txt
          cd deps && zip -r ../deps.zip . && cd ..
          # Upload to S3
          aws s3 cp my_project.zip s3://my-bucket/jobs/
          aws s3 cp deps.zip s3://my-bucket/jobs/
          aws s3 cp main.py s3://my-bucket/jobs/

Production Readiness Checklist

CategoryCheck
IdempotencyJob can re-run safely — use mode("overwrite") with partitioned output or Delta Lake MERGE
LoggingLog to stdout (captured by YARN/K8s). Use log4j config to suppress noisy Spark logs
MonitoringSpark UI (port 4040), History Server for completed jobs, Ganglia/CloudWatch/Prometheus for cluster metrics
RetriesOrchestrator handles retries (Airflow retries=2). Spark retries failed tasks internally (spark.task.maxFailures=4)
Data validationAssert row counts, null percentages, and schema after writes — Great Expectations or custom checks
Dependency pinningPin Spark version + all library versions in requirements.txt / pom.xml. Test upgrades in staging first
SecretsNever hardcode credentials. Use IAM roles (EMR), K8s secrets, or Databricks secret scopes
Cluster sizingStart small, profile with Spark UI, then scale. Enable dynamic allocation for variable workloads

Configuration Reference

Key Configuration Properties

PropertyDefaultNotes
spark.executor.memory1gExecutor JVM heap. Typically 4–18g in prod.
spark.executor.cores1 (local: all)Cores per executor. Usually 4–5 in prod.
spark.executor.instances2Number of executors. Use dynamic allocation in prod.
spark.driver.memory1gDriver JVM heap. Increase if collecting large results.
spark.driver.maxResultSize1gMax size of results returned to driver. Set 0 for unlimited.
spark.sql.shuffle.partitions200Partitions after a shuffle (DataFrame API). Tune to ~128–256 MB / partition.
spark.default.parallelismTotal executor coresDefault partitions for RDD operations.
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)Tables smaller than this are auto-broadcast. Set -1 to disable.
spark.sql.adaptive.enabledtrue (3.2+)Enable Adaptive Query Execution.
spark.serializerJavaSerializerSet to KryoSerializer for RDD-heavy workloads.
spark.memory.fraction0.6Fraction of heap for execution + storage. Rest is user memory.
spark.memory.storageFraction0.5Fraction of spark.memory.fraction reserved for cache storage.
spark.network.timeout120sIncrease if executors frequently timeout on slow operations.
spark.sql.broadcastTimeout300sTimeout for broadcast variable collection. Increase for large broadcasts.
spark.sql.files.maxPartitionBytes128mMax bytes per partition when reading files.
spark.yarn.executor.memoryOverheadexecutorMemory * 0.1, min 384mNon-heap memory (Python process, off-heap). Increase for PySpark.
spark.dynamicAllocation.enabledfalseAuto-scale executors based on workload.
spark.dynamicAllocation.minExecutors0Min executors when idle.
spark.dynamicAllocation.maxExecutorsinfinityCap executor count.
spark.sql.parquet.compression.codeczstd (3.4+; snappy before)Options: snappy, gzip, lz4, zstd. zstd is default since Spark 3.4.

Setting Configuration

# At SparkSession creation (preferred)
spark = (SparkSession.builder
    .appName("MyApp")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.executor.memory", "8g")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.maxExecutors", "50")
    .getOrCreate())

# At runtime (some properties are immutable after session start)
spark.conf.set("spark.sql.shuffle.partitions", "100")

# Read a config value
print(spark.conf.get("spark.sql.shuffle.partitions"))

# Via spark-submit
# spark-submit --conf spark.executor.memory=8g --conf spark.sql.shuffle.partitions=200 job.py

# Via spark-defaults.conf (cluster-wide defaults)
# $SPARK_HOME/conf/spark-defaults.conf:
# spark.executor.memory    8g
# spark.sql.shuffle.partitions    200

Dynamic Allocation

# Dynamic allocation automatically adds/removes executors based on pending tasks
# Requires shuffle service (for YARN/Standalone) or K8s

spark = SparkSession.builder \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.initialExecutors", "5") \
    .config("spark.dynamicAllocation.maxExecutors", "100") \
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
    .config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") \
    .config("spark.shuffle.service.enabled", "true") \
    .getOrCreate()

Common Pitfalls

1. Driver OOM from collect() on Large Data

Most Common Beginner Mistake
df.collect() and df.toPandas() pull ALL data to the driver. A 50 GB DataFrame kills a 4 GB driver.
# BAD
all_rows = df.collect()  # 50 GB DataFrame → driver OOM

# GOOD — write to storage and process offline
df.write.parquet("s3://bucket/output/")

# GOOD — sample for exploration
sample = df.sample(0.001).toPandas()

# GOOD — aggregate first, then collect the summary
summary = df.groupBy("dept").agg(F.sum("salary")).collect()  # tiny result

2. Data Skew — One Task Takes Forever

# Symptom: Stage has 199 tasks completing in 10s, 1 task running for 10 minutes

# Diagnosis
df.groupBy("join_key").count().orderBy(F.desc("count")).show(10)

# Fix: see Joins section — broadcast, salting, or AQE skew join

3. Too Many Small Files

# Symptom: Reading job is slow even though data is small
# Cause: Streaming or partitioned writes produced millions of tiny files

# Diagnosis
# Check number of files in a path:
# aws s3 ls --recursive s3://bucket/path/ | wc -l

# Fix: coalesce before write, or OPTIMIZE (Delta Lake)
df.coalesce(20).write.parquet("output/")

4. UDF Performance

# BAD — Python UDF: row-by-row Python serialization overhead
@udf("string")
def format_name(first, last):
    return f"{last}, {first}".upper()

df.withColumn("formatted", format_name("first", "last"))

# GOOD — use built-in functions (no Python overhead)
from pyspark.sql.functions import concat_ws, upper
df.withColumn("formatted", upper(concat_ws(", ", col("last"), col("first"))))

# GOOD — if you must use Python logic, use Pandas UDF (vectorized)
@pandas_udf("string")
def format_name_pandas(first: pd.Series, last: pd.Series) -> pd.Series:
    return (last + ", " + first).str.upper()

df.withColumn("formatted", format_name_pandas("first", "last"))

5. Accidental Cartesian Join

# BAD — cross join with 1M × 1M rows = 1 trillion rows
df1.join(df2)   # no join key! Spark 2.x silently does cartesian product

# Spark 3.x throws AnalysisException if cross join is not explicit — use .crossJoin()

# Always provide a join key
df1.join(df2, "user_id", "inner")  # explicit key

6. Caching the Wrong Dataset

# BAD — caching a wide DataFrame then dropping most columns
df_wide = spark.read.parquet("huge_table/")
df_wide.cache()  # caches ALL 200 columns
result = df_wide.select("id", "name", "salary")

# GOOD — cache after projection
df_narrow = spark.read.parquet("huge_table/").select("id", "name", "salary")
df_narrow.cache()
df_narrow.count()  # materialize

# BAD — forgetting to unpersist
df_temp.cache()
# ... do work ...
# forgot to call df_temp.unpersist() — memory leak

# GOOD — always unpersist when done
df_temp.cache()
df_temp.count()
try:
    process(df_temp)
finally:
    df_temp.unpersist()

7. Python vs JVM Serialization Overhead

# PySpark DataFrames are fast — processing stays in JVM/Tungsten
# Python overhead only occurs at these boundaries:
#   1. Python UDFs
#   2. .collect() / .toPandas()
#   3. RDD operations with Python lambdas
#   4. foreachBatch with Python code

# Minimize crossings — do as much as possible with DataFrame API
# before touching Python-specific code

# Example: avoid RDD if DataFrame API works
# BAD
df.rdd.map(lambda row: row["salary"] * 1.1)  # crosses JVM/Python boundary

# GOOD
df.withColumn("salary", col("salary") * 1.1)  # stays in JVM Tungsten

8. Wrong shuffle.partitions Setting

# Default: spark.sql.shuffle.partitions = 200
# This is TOO HIGH for small data (creates tiny partitions and overhead)
# and TOO LOW for large data (creates overloaded partitions that spill to disk)

# Rule: target 128–256 MB per partition after shuffle

# Small dataset (dev/test): set low
spark.conf.set("spark.sql.shuffle.partitions", "8")

# Large dataset (1 TB): partition count = total data / target size
# 1TB / 200MB = 5000 partitions
spark.conf.set("spark.sql.shuffle.partitions", "5000")

# OR — enable AQE to auto-coalesce
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE will merge small partitions automatically

9. Mistaking Lazy Evaluation for Execution

# BAD — assuming transformations execute immediately
df = spark.read.parquet("huge_file.parquet")
df = df.filter(col("year") == 2024)   # nothing executes here
df = df.groupBy("dept").count()        # still nothing
# ... application exits before any action is called!

# GOOD — an action must be called to trigger execution
result = df.filter(col("year") == 2024).groupBy("dept").count()
result.show()   # triggers the actual job
result.write.parquet("output/")  # also triggers execution

# Common mistake in scripts: forgetting to call an action on the final result
# If your script seems to do nothing, check that you're calling an action.

10. groupByKey on String Keys — Use Hashing Wisely

# groupByKey collects ALL values for a key to one partition
# On high-cardinality string keys this can cause OOM
pairs = df.rdd.map(lambda row: (row["user_id"], row["event"]))
pairs.groupByKey()   # dangerous — shuffles all events per user to one node

# reduceByKey does partial aggregation (combine) before shuffle
pairs = df.rdd.map(lambda row: (row["user_id"], 1))
pairs.reduceByKey(lambda a, b: a + b)  # safer — count events per user

# Even better — use DataFrame API (Catalyst optimizes it fully)
df.groupBy("user_id").count()