Apache Spark Refresher
PySpark + Scala — distributed data processing quick reference
Table of Contents
0. Setup & Environment
No local Spark installation required. Docker gives you a fully working PySpark or Scala shell in under a minute.
Prerequisites
Install Docker Desktop (Mac):
brew install --cask docker # or download from docker.com
docker --version # verify: Docker version 27.x.x
Start PySpark Shell
One command — no local Python or JVM setup needed:
docker run -it --rm \
-p 4040:4040 \
apache/spark-py:latest \
/opt/spark/bin/pyspark
- Ephemeral container — nothing persists after exit.
- Spark UI available at http://localhost:4040 while the shell is running.
--rm auto-removes the container on exit, keeping your Docker image list clean. Drop it if you want to inspect container state post-run.
Start Spark Shell (Scala)
docker run -it --rm \
-p 4040:4040 \
apache/spark:latest \
/opt/spark/bin/spark-shell
Working with Local Files
Mount a host directory into the container with -v:
docker run -it --rm \
-p 4040:4040 \
-v $(pwd)/data:/data \
apache/spark-py:latest \
/opt/spark/bin/pyspark
Then inside the PySpark shell:
df = spark.read.csv("/data/myfile.csv", header=True)
df.show(5)
Docker Compose — Standalone Cluster (Optional)
For multi-worker practice, create docker-compose.yml:
services:
spark-master:
image: apache/spark:latest
command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "8080:8080" # Spark master UI
- "7077:7077" # cluster connect port
environment:
- SPARK_MODE=master
spark-worker:
image: apache/spark:latest
command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
docker compose up -d # start cluster in background
docker compose down # tear it down
spark-submit --master spark://localhost:7077 ....
Quick Verify
Inside the PySpark shell, run a trivial job to confirm everything is wired up:
df = spark.range(1000)
df.filter(df.id % 2 == 0).count()
# Expected output: 500
Then open http://localhost:4040 — you should see one completed job in the Spark UI.
Architecture
Spark is a unified engine for large-scale data analytics. It runs as a master/worker cluster: one Driver orchestrates work; many Executors do the actual computation in parallel.
Core Components
| Component | Role | Lives on |
|---|---|---|
| Driver Program | Hosts SparkContext / SparkSession; builds logical plan; schedules stages onto executors | Your machine or cluster head node |
| SparkContext | Entry point for Spark 1.x / RDD API; manages cluster connection | Driver JVM |
| SparkSession | Unified entry point (Spark 2+); wraps SparkContext; adds DataFrame/SQL API | Driver JVM |
| Executor | JVM process on each worker node; runs tasks; caches data | Worker nodes |
| Task | Smallest unit of work — processes one partition | Executor thread |
| Stage | Set of tasks with no shuffle boundary between them | Logical grouping |
| Job | Triggered by an action; broken into stages | Logical grouping |
| Cluster Manager | Allocates executor resources to Spark apps | External (YARN/K8s/Standalone) |
Cluster Managers
| Manager | When to use | Key flag |
|---|---|---|
| Standalone | Simple clusters, dev/test, full Spark control | --master spark://host:7077 |
| YARN | Existing Hadoop clusters; shares resources with MapReduce/Hive | --master yarn |
| Kubernetes | Cloud-native; containerized workloads; GKE/EKS/AKS | --master k8s://https://host:port |
| Mesos | Removed in Spark 3.4. Was a general-purpose resource manager, replaced by K8s | N/A |
| Local | Single machine / unit tests | --master local[*] |
DAG Scheduler & Lazy Evaluation
Spark builds a Directed Acyclic Graph (DAG) of transformations but does nothing until an action is called. This is lazy evaluation:
- Each transformation returns a new DataFrame/RDD — no data moves.
- When an action fires, the DAG is submitted to the DAG Scheduler, which splits it into stages at shuffle boundaries.
- The Task Scheduler sends tasks (one per partition) to executor threads.
- The Catalyst optimizer rewrites the logical plan before execution (for DataFrames/SQL).
# PySpark — SparkSession entry point
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("MyApp")
.master("local[*]") # local mode, all cores
.config("spark.sql.shuffle.partitions", 8) # tune for local
.getOrCreate())
sc = spark.sparkContext # access underlying SparkContext
print(sc.defaultParallelism) # number of cores available
// Scala — SparkSession entry point
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("MyApp")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate()
val sc = spark.sparkContext
println(sc.defaultParallelism)
spark-submit --master yarn --deploy-mode cluster --executor-memory 4g --num-executors 10 my_job.py
Core Concepts — RDDs
RDD (Resilient Distributed Dataset) is Spark's foundational data structure — an immutable, fault-tolerant, distributed collection of records. DataFrames are built on top of RDDs but add schema and optimization. Use RDDs directly when you need fine-grained control or work with unstructured data.
Creating RDDs
# From a Python collection
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=4)
# From a text file (each line = one record)
rdd = sc.textFile("hdfs:///data/logs/*.txt")
# From a sequence file
rdd = sc.sequenceFile("hdfs:///data/kv")
# From another RDD (transformation)
rdd2 = rdd.map(lambda x: x * 2)
// From a collection
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), numSlices = 4)
// From a text file
val rdd = sc.textFile("hdfs:///data/logs/*.txt")
// From another RDD
val rdd2 = rdd.map(_ * 2)
Narrow vs. Wide Transformations
This distinction determines whether a shuffle occurs:
| Type | Shuffle? | Stage boundary? | Examples |
|---|---|---|---|
| Narrow | No | No — pipelined in one stage | map, filter, flatMap, mapPartitions, union |
| Wide | Yes | Yes — new stage after shuffle | groupByKey, reduceByKey, join, distinct, repartition, sortBy |
groupByKey shuffles ALL values across the network before aggregating. reduceByKey does a map-side combine (partial aggregation) first, dramatically reducing shuffle data.
Common RDD Operations
# --- Transformations (lazy) ---
rdd.map(lambda x: x + 1) # apply function to each element
rdd.flatMap(lambda x: x.split()) # map then flatten
rdd.filter(lambda x: x > 0) # keep elements matching predicate
rdd.mapPartitions(lambda it: process(it)) # one call per partition (efficient)
rdd.distinct() # remove duplicates (shuffle)
rdd.sample(withReplacement=False, fraction=0.1, seed=42)
# Pair RDD transformations
pairs = rdd.map(lambda x: (x % 3, x)) # (key, value) pairs
pairs.reduceByKey(lambda a, b: a + b) # aggregate by key (map-side combine)
pairs.groupByKey() # dangerous on large datasets — avoid
pairs.combineByKey(createCombiner, mergeValue, mergeCombiner)
pairs.sortByKey(ascending=True)
pairs.mapValues(lambda v: v * 2)
# Set operations
rdd1.union(rdd2) # concatenate (no dedup)
rdd1.intersection(rdd2) # common elements (shuffle)
rdd1.subtract(rdd2) # elements in rdd1 not in rdd2 (shuffle)
# --- Actions (trigger execution) ---
rdd.collect() # bring all data to driver — dangerous on large data!
rdd.count() # number of elements
rdd.first() # first element
rdd.take(10) # first N elements
rdd.top(10) # top N by natural order
rdd.reduce(lambda a, b: a + b) # aggregate all elements
rdd.foreach(print) # apply function on each element (on executors)
rdd.saveAsTextFile("hdfs:///output/")
Lineage Graph & Fault Tolerance
Spark tracks the lineage (sequence of transformations) for each RDD. If a partition is lost due to executor failure, Spark re-computes it from the parent RDD using the lineage — no data replication needed by default.
rdd.toDebugString() # print lineage as ASCII tree
Partitions
Each RDD/DataFrame is divided into partitions — the unit of parallelism. One task processes one partition on one executor thread.
rdd.getNumPartitions() # how many partitions?
rdd.repartition(100) # reshuffle to exactly 100 partitions
rdd.coalesce(50) # reduce partitions without full shuffle
# Custom partitioner (pair RDDs)
rdd.partitionBy(8, lambda key: hash(key) % 8)
# mapPartitions — avoids per-row overhead (e.g., DB connections)
def process_partition(iterator):
conn = open_connection()
for row in iterator:
conn.write(row)
conn.close()
yield "done"
rdd.mapPartitions(process_partition)
DataFrames & Datasets
A DataFrame is a distributed table with named columns and a schema. It is Spark's primary API for structured data and the target of the Catalyst optimizer. A Dataset (Scala/Java only) adds compile-time type safety on top of DataFrames.
Schema Definition
from pyspark.sql.types import (
StructType, StructField,
StringType, IntegerType,
LongType, DoubleType,
BooleanType, TimestampType,
ArrayType, MapType
)
schema = StructType([
StructField("id", LongType(), nullable=False),
StructField("name", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
StructField("score", DoubleType(), nullable=True),
StructField("active", BooleanType(), nullable=True),
StructField("created_at", TimestampType(), nullable=True),
StructField("tags", ArrayType(StringType()), nullable=True),
StructField("metadata", MapType(StringType(), StringType()), nullable=True),
])
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("score", DoubleType, nullable = true),
StructField("active", BooleanType, nullable = true),
StructField("created_at", TimestampType, nullable = true),
StructField("tags", ArrayType(StringType), nullable = true),
))
Creating DataFrames
# From a list of tuples
data = [(1, "Alice", 30), (2, "Bob", 25)]
df = spark.createDataFrame(data, ["id", "name", "age"])
# From list of dicts
data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
df = spark.createDataFrame(data)
# From RDD + explicit schema
rdd = sc.parallelize([(1, "Alice"), (2, "Bob")])
df = spark.createDataFrame(rdd, schema)
# From CSV
df = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NA")
.csv("hdfs:///data/employees.csv"))
# From CSV with explicit schema (faster — skips inference pass)
df = spark.read.schema(schema).csv("hdfs:///data/employees.csv")
# From JSON (newline-delimited)
df = spark.read.json("hdfs:///data/events/")
# From Parquet (schema embedded — best format for Spark)
df = spark.read.parquet("hdfs:///data/warehouse/users/")
# From Delta Lake
df = spark.read.format("delta").load("s3://bucket/delta/users/")
# From JDBC
df = (spark.read.format("jdbc")
.option("url", "jdbc:postgresql://host/db")
.option("dbtable", "public.orders")
.option("user", "user")
.option("password", "pass")
.option("numPartitions", 10)
.option("partitionColumn", "id")
.option("lowerBound", 1)
.option("upperBound", 10000000)
.load())
// From a Seq
val data = Seq((1, "Alice", 30), (2, "Bob", 25))
val df = data.toDF("id", "name", "age")
// From CSV
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs:///data/employees.csv")
// With explicit schema
val df = spark.read
.schema(schema)
.csv("hdfs:///data/employees.csv")
// From Parquet
val df = spark.read.parquet("hdfs:///data/warehouse/users/")
// From JDBC
val df = spark.read.format("jdbc")
.option("url", "jdbc:postgresql://host/db")
.option("dbtable", "public.orders")
.option("numPartitions", "10")
.option("partitionColumn", "id")
.option("lowerBound", "1")
.option("upperBound", "10000000")
.load()
DataFrame vs. Dataset
| Feature | DataFrame | Dataset[T] |
|---|---|---|
| API | Untyped — columns resolved at runtime | Typed — compile-time type safety |
| Language | Python, Scala, Java, R | Scala and Java only |
| Performance | Best — Tungsten + Catalyst optimization | Slightly lower — encoder overhead for lambdas |
| Error detection | Runtime | Compile-time |
| Use when | SQL-like transformations, ML pipelines | Domain objects, type-safe functional pipelines |
// Dataset — Scala only
case class Employee(id: Long, name: String, age: Int)
val ds: Dataset[Employee] = df.as[Employee] // convert DataFrame to Dataset
// Type-safe operations
val seniors: Dataset[Employee] = ds.filter(_.age > 40)
val names: Dataset[String] = ds.map(_.name)
// Note: ds.filter(_.age > 40) uses JVM serialization for the lambda
// df.filter(col("age") > 40) stays in Tungsten — usually faster
Inspecting DataFrames
df.printSchema() # tree view of schema with nullable flags
df.show(n=20, truncate=True) # tabular preview (default 20 rows)
df.show(5, truncate=False) # full column content
df.dtypes # list of (column_name, type_string) tuples
df.columns # list of column names
df.count() # number of rows (triggers a job)
df.describe("age", "score").show() # count, mean, stddev, min, max
# Explain the physical plan
df.explain() # physical plan only
df.explain(mode="extended") # logical + optimized logical + physical
df.explain(mode="formatted") # structured plan output (Spark 3+)
Spark SQL
Spark SQL lets you run ANSI SQL against DataFrames. Under the hood it uses the same Catalyst optimizer as the DataFrame API — results are identical.
Temp Views & SQL Queries
# Register as temp view (session-scoped)
df.createOrReplaceTempView("employees")
# Register as global temp view (cross-session)
df.createOrReplaceGlobalTempView("employees_global")
# query with: spark.sql("SELECT * FROM global_temp.employees_global")
# Run SQL
result = spark.sql("""
SELECT department,
COUNT(*) AS headcount,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary
FROM employees
WHERE active = true
GROUP BY department
HAVING COUNT(*) > 5
ORDER BY avg_salary DESC
""")
result.show()
# SQL with subqueries
spark.sql("""
SELECT e.name, e.salary, d.budget
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > (SELECT AVG(salary) FROM employees)
""").show()
df.createOrReplaceTempView("employees")
val result = spark.sql("""
SELECT department,
COUNT(*) AS headcount,
AVG(salary) AS avg_salary
FROM employees
WHERE active = true
GROUP BY department
ORDER BY avg_salary DESC
""")
result.show()
Catalyst Optimizer
Catalyst is Spark SQL's query optimizer. It transforms your logical plan through four phases:
- Analysis — resolve column names and types against the catalog
- Logical Optimization — predicate pushdown, constant folding, column pruning, join reordering
- Physical Planning — select physical operators (broadcast vs. sort-merge join, etc.)
- Code Generation — whole-stage codegen via Janino compiles query to JVM bytecode
df.filter("age > 30").join(...) — put filters before joins.
UDFs — User-Defined Functions
pyspark.sql.functions) whenever possible. If you must use a Python UDF, consider Pandas UDFs (vectorized) instead.
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StringType, LongType
import pandas as pd
# Regular UDF (slow — row-at-a-time Python serialization)
@udf(returnType=StringType())
def clean_name(name: str) -> str:
if name is None:
return None
return name.strip().title()
df.withColumn("clean_name", clean_name("name")).show()
# Register for SQL use
spark.udf.register("clean_name_sql", clean_name)
spark.sql("SELECT clean_name_sql(name) FROM employees").show()
# Pandas UDF (vectorized — much faster for Python)
@pandas_udf(LongType())
def fast_hash(series: pd.Series) -> pd.Series:
return series.apply(hash).abs() % (2**31)
df.withColumn("hash_id", fast_hash("name")).show()
# Grouped map with applyInPandas (plain function, NOT @pandas_udf)
def normalize_salary_by_dept(pdf: pd.DataFrame) -> pd.DataFrame:
pdf["salary_norm"] = (pdf["salary"] - pdf["salary"].mean()) / pdf["salary"].std()
return pdf
result_schema = "department string, name string, salary double, salary_norm double"
df.groupby("department").applyInPandas(normalize_salary_by_dept, schema=result_schema)
import org.apache.spark.sql.functions.udf
// Scala UDF — stays in JVM, minimal overhead
val cleanName = udf((name: String) =>
Option(name).map(_.trim.toLowerCase.capitalize).orNull
)
df.withColumn("clean_name", cleanName(col("name"))).show()
// Register for SQL
spark.udf.register("clean_name_sql", cleanName)
spark.sql("SELECT clean_name_sql(name) FROM employees").show()
Broadcast Variables
# Broadcast a lookup table to all executors (avoids shipping it per task)
country_map = {"US": "United States", "GB": "Great Britain", "DE": "Germany"}
bc_map = sc.broadcast(country_map)
# Use in RDD transformation
result = rdd.map(lambda code: bc_map.value.get(code, "Unknown"))
# In DataFrame context, use broadcast join (covered in Joins section)
from pyspark.sql.functions import broadcast
df.join(broadcast(small_df), "country_code")
# Clean up when done
bc_map.unpersist()
bc_map.destroy() # removes from executor memory entirely
Transformations
Select, Filter, WithColumn
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, when, coalesce
# Select columns
df.select("id", "name", "salary")
df.select(col("id"), col("name"), (col("salary") * 1.1).alias("new_salary"))
df.select("*", F.upper("name").alias("name_upper"))
# Filter / where (identical)
df.filter(col("age") > 30)
df.where("salary > 50000 AND active = true")
df.filter((col("department") == "Engineering") & col("active"))
# withColumn — add or replace a column
df = df.withColumn("bonus", col("salary") * 0.1)
df = df.withColumn("level",
when(col("salary") > 200000, "L7")
.when(col("salary") > 150000, "L6")
.when(col("salary") > 100000, "L5")
.otherwise("L4"))
# withColumnRenamed
df = df.withColumnRenamed("old_col", "new_col")
# drop columns
df = df.drop("temp_col", "another_col")
# coalesce — first non-null value
df = df.withColumn("display_name", coalesce(col("nickname"), col("name"), lit("Unknown")))
# Cast types
df = df.withColumn("age", col("age").cast("integer"))
df = df.withColumn("created_at", col("created_at").cast("timestamp"))
import org.apache.spark.sql.functions._
// Select
df.select("id", "name", "salary")
df.select($"id", $"name", ($"salary" * 1.1).as("new_salary"))
// Filter
df.filter($"age" > 30)
df.where("salary > 50000 AND active = true")
// withColumn
val df2 = df
.withColumn("bonus", $"salary" * 0.1)
.withColumn("level",
when($"salary" > 200000, "L7")
.when($"salary" > 150000, "L6")
.otherwise("L4"))
// Drop, rename, cast
df.drop("temp_col")
.withColumnRenamed("old_col", "new_col")
.withColumn("age", $"age".cast("integer"))
GroupBy & Aggregation
from pyspark.sql import functions as F
# Single groupBy
df.groupBy("department").agg(
F.count("*").alias("headcount"),
F.avg("salary").alias("avg_salary"),
F.max("salary").alias("max_salary"),
F.min("salary").alias("min_salary"),
F.sum("salary").alias("total_payroll"),
F.stddev("salary").alias("salary_stddev"),
F.collect_list("name").alias("names"), # order not guaranteed
F.collect_set("skill").alias("skills"), # distinct values
F.countDistinct("manager_id").alias("unique_managers"),
)
# Multi-column groupBy
df.groupBy("department", "level").count()
# Rollup and Cube for subtotals
df.rollup("department", "level").agg(F.count("*")).show()
df.cube("department", "level").agg(F.avg("salary")).show()
# groupBy + filter (HAVING equivalent)
df.groupBy("department") \
.agg(F.count("*").alias("cnt")) \
.filter(col("cnt") > 10) \
.show()
df.groupBy("department").agg(
count("*").as("headcount"),
avg("salary").as("avg_salary"),
max("salary").as("max_salary"),
sum("salary").as("total_payroll"),
collect_list("name").as("names"),
countDistinct("manager_id").as("unique_managers")
)
// Rollup
df.rollup("department", "level")
.agg(count("*"))
.show()
Window Functions
Window functions compute values across a sliding window of rows relative to the current row — without collapsing rows like groupBy.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Define window spec
w_dept = (Window
.partitionBy("department")
.orderBy(F.desc("salary")))
# Ranking functions
df = df.withColumn("rank", F.rank().over(w_dept))
df = df.withColumn("dense_rank", F.dense_rank().over(w_dept))
df = df.withColumn("row_number", F.row_number().over(w_dept))
df = df.withColumn("percent_rank",F.percent_rank().over(w_dept))
df = df.withColumn("ntile_4", F.ntile(4).over(w_dept))
# Lag / Lead — access previous/next row values
df = df.withColumn("prev_salary", F.lag("salary", 1).over(w_dept))
df = df.withColumn("next_salary", F.lead("salary", 1).over(w_dept))
df = df.withColumn("salary_diff", col("salary") - F.lag("salary", 1).over(w_dept))
# Running totals with frame specification
w_running = (Window
.partitionBy("department")
.orderBy("hire_date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow))
df = df.withColumn("running_total", F.sum("salary").over(w_running))
# Moving average (3-row centered window)
w_moving = (Window
.partitionBy("department")
.orderBy("month")
.rowsBetween(-1, 1)) # one before, current, one after
df = df.withColumn("moving_avg", F.avg("salary").over(w_moving))
# First / last value in window
w_all = Window.partitionBy("department").orderBy("hire_date") \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = df.withColumn("dept_first_hire", F.first("name").over(w_all))
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val wDept = Window
.partitionBy("department")
.orderBy(desc("salary"))
val df2 = df
.withColumn("rank", rank().over(wDept))
.withColumn("dense_rank", dense_rank().over(wDept))
.withColumn("row_number", row_number().over(wDept))
.withColumn("prev_salary", lag("salary", 1).over(wDept))
// Running total
val wRunning = Window
.partitionBy("department")
.orderBy("hire_date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val df3 = df2.withColumn("running_total", sum("salary").over(wRunning))
Pivot & Unpivot
# Pivot — rows to columns
pivot_df = (df
.groupBy("employee_id")
.pivot("quarter", ["Q1", "Q2", "Q3", "Q4"]) # explicit values = faster
.agg(F.sum("revenue")))
# Without explicit values (Spark scans data first to find distinct values — slower)
pivot_df = df.groupBy("employee_id").pivot("quarter").agg(F.sum("revenue"))
# Unpivot (stack) — columns to rows — Spark 3.4+ native, else use selectExpr
from pyspark.sql.functions import expr
# Native stack function
unpivot_df = df.select(
"employee_id",
F.expr("stack(4, 'Q1', Q1, 'Q2', Q2, 'Q3', Q3, 'Q4', Q4) AS (quarter, revenue)")
)
# Spark 3.4+ unpivot
unpivot_df = df.unpivot(["employee_id"], ["Q1", "Q2", "Q3", "Q4"], "quarter", "revenue")
Actions & Writing Data
Common Actions
df.count() # triggers full scan — can be expensive
df.collect() # returns list of Row objects to driver — OOM risk!
df.first() # first row
df.head(5) # first 5 rows (same as take(5))
df.take(10) # list of first 10 Row objects
df.show(20, truncate=50) # print tabular (for interactive exploration)
df.toPandas() # converts to Pandas DataFrame — driver memory!
# Safer alternatives to collect()
df.limit(1000).collect() # cap before pulling to driver
df.sample(fraction=0.01).toPandas() # sample first
# Aggregation actions (compute one value)
df.agg(F.sum("salary")).collect()[0][0]
collect() and toPandas() bring ALL data to the driver JVM. On a 100 GB dataset this causes OOM and kills the driver. Use limit() or write() to files instead.
Writing DataFrames
# Save modes: append | overwrite | error (default) | ignore
df.write.mode("overwrite").parquet("hdfs:///output/users/")
# Parquet (recommended default — columnar, compressed, schema embedded)
df.write \
.mode("overwrite") \
.option("compression", "snappy") \
.parquet("s3://bucket/users/")
# CSV
df.write \
.mode("append") \
.option("header", "true") \
.option("delimiter", ",") \
.csv("hdfs:///output/report/")
# JSON (newline-delimited)
df.write.mode("overwrite").json("hdfs:///output/events/")
# Delta Lake (ACID, time-travel, schema enforcement)
df.write.format("delta").mode("overwrite").save("s3://bucket/delta/users/")
# Partition by a column (creates directory structure: year=2024/month=01/...)
df.write \
.partitionBy("year", "month") \
.mode("overwrite") \
.parquet("hdfs:///output/events/")
# Bucket for joins (write once, join many times efficiently)
df.write \
.bucketBy(64, "user_id") \
.sortBy("user_id") \
.mode("overwrite") \
.saveAsTable("users_bucketed")
# JDBC write
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://host/db") \
.option("dbtable", "output_table") \
.option("user", "user") \
.option("password", "pass") \
.mode("append") \
.save()
// Parquet
df.write
.mode("overwrite")
.option("compression", "snappy")
.parquet("s3://bucket/users/")
// Partitioned write
df.write
.partitionBy("year", "month")
.mode("overwrite")
.parquet("hdfs:///output/events/")
// Delta
df.write
.format("delta")
.mode("overwrite")
.save("s3://bucket/delta/users/")
// Bucketed table
df.write
.bucketBy(64, "user_id")
.sortBy("user_id")
.saveAsTable("users_bucketed")
Save Modes
| Mode | If data exists | Use when |
|---|---|---|
error (default) | Throws exception | Safe default, prevents accidental overwrites |
overwrite | Deletes existing, writes new | Full refresh of a table/path |
append | Adds new data alongside existing | Incremental loads, streaming sinks |
ignore | Silently skips write | Idempotent writes — skip if already done |
Joins
Join Types
# Basic join syntax
df_orders.join(df_users, on="user_id", how="inner")
# Explicit condition (handles column name collision)
df_orders.join(df_users,
df_orders["user_id"] == df_users["id"],
how="left")
# Multiple join keys
df.join(other, on=["dept_id", "year"], how="inner")
# Join types
how = "inner" # rows in both (default)
how = "left" # all rows from left, nulls where no match in right
how = "right" # all rows from right
how = "full" # all rows from both (outer)
how = "cross" # cartesian product — DANGEROUS on large tables
how = "semi" # rows from left that HAVE a match in right (no right cols)
how = "anti" # rows from left that DO NOT match right
# Semi-join example — "which orders have a matching product?"
orders_with_product = df_orders.join(df_products, "product_id", "semi")
# Anti-join example — "which users have never placed an order?"
users_no_orders = df_users.join(df_orders, "user_id", "anti")
// Basic joins
dfOrders.join(dfUsers, "user_id") // inner
dfOrders.join(dfUsers, "user_id", "left") // left outer
dfOrders.join(dfUsers, "user_id", "semi") // left semi
dfOrders.join(dfUsers, "user_id", "anti") // left anti
// Explicit condition
dfOrders.join(dfUsers,
dfOrders("user_id") === dfUsers("id"),
"inner")
// Cross join (must be explicit in Spark 3.x)
df1.crossJoin(df2)
Join Strategies
| Strategy | When Spark uses it | Shuffle? | Best for |
|---|---|---|---|
| Broadcast Hash Join | Small table < broadcast threshold (10 MB default) | No | Large table × small lookup table |
| Shuffle Hash Join | Medium-sized tables, hash fits in memory | Yes | Non-sortable keys, unequal sizes |
| Sort-Merge Join | Large + large table joins (default for most joins) | Yes | Both sides large, equi-joins |
| Cartesian / Nested Loop | No join key (cross join or non-equi join) | Yes | Avoid — O(n*m) complexity |
Broadcast Join
from pyspark.sql.functions import broadcast
# Force broadcast on the small side
result = df_large.join(broadcast(df_small), "key_col")
# Tune threshold (bytes) — default 10MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) # 50 MB
# Disable auto-broadcast (force sort-merge for reproducibility)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# SQL hint
spark.sql("""
SELECT /*+ BROADCAST(d) */ e.*, d.name AS dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.id
""")
Data Skew in Joins
# Diagnose: check value distribution
df.groupBy("join_key").count().orderBy(F.desc("count")).show(20)
# Solution 1: Salting — break hot keys into sub-keys
import random
SALT_FACTOR = 50
# Salt the left (large) side
df_large_salted = df_large.withColumn(
"salted_key",
F.concat(col("join_key"), F.lit("_"), (F.rand() * SALT_FACTOR).cast("int"))
)
# Explode the right (small/dimension) side
from pyspark.sql.functions import array, explode
df_small_exploded = df_small.withColumn("salt", F.array([F.lit(i) for i in range(SALT_FACTOR)])) \
.withColumn("salt", F.explode("salt")) \
.withColumn("salted_key", F.concat(col("join_key"), F.lit("_"), col("salt")))
result = df_large_salted.join(df_small_exploded, "salted_key").drop("salted_key", "salt")
# Solution 2: Skew Join Hint (Spark 3.0+ AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
# Solution 3: Broadcast the skewed dimension (if it fits)
result = df_large.join(broadcast(df_small), "join_key")
Shuffles & Partitioning
Repartition vs. Coalesce
| repartition(n) | coalesce(n) | |
|---|---|---|
| Shuffle | Yes — full shuffle | No (usually) — moves data locally where possible |
| Direction | Up or down | Down only (reduces partitions) |
| Data distribution | Even distribution | Can create uneven partitions |
| Use when | Need even partitions for compute; before wide transformations | Reduce small files before writing |
df.repartition(200) # by count, round-robin
df.repartition(200, col("department")) # by count + column (hash partitioned)
df.repartition(col("year"), col("month")) # by columns only
df.coalesce(10) # reduce to 10 partitions, minimal shuffle
# Check partition count
df.rdd.getNumPartitions()
# Inspect partition sizes
df.rdd.mapPartitionsWithIndex(
lambda idx, it: [(idx, sum(1 for _ in it))]
).toDF(["partition_id", "count"]).orderBy("partition_id").show()
# Default parallelism — controls initial partition count for operations
print(sc.defaultParallelism) # usually = total executor cores
spark.conf.set("spark.default.parallelism", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200) # for DataFrame shuffles
Partition Pruning on Reads
# Data written with partitionBy creates directory structure:
# s3://bucket/events/year=2024/month=01/day=15/*.parquet
# When reading, Spark prunes directories that don't match the filter
df = spark.read.parquet("s3://bucket/events/")
df_jan = df.filter("year = 2024 AND month = 1") # only reads year=2024/month=01/ dirs
# Partition column values are inferred from directory names
# They do NOT appear in the actual Parquet files — just the directory path
Bucketing
# Write bucketed table (requires Hive metastore / spark catalog)
df.write \
.bucketBy(64, "user_id") \
.sortBy("user_id") \
.saveAsTable("events_bucketed")
# Bucketing eliminates shuffle when joining two bucketed tables
# on the same key with the same number of buckets
df_users.write.bucketBy(64, "user_id").saveAsTable("users_bucketed")
df_events.write.bucketBy(64, "user_id").saveAsTable("events_bucketed")
# This join has NO shuffle — data is pre-sorted and co-located
result = spark.table("users_bucketed").join(spark.table("events_bucketed"), "user_id")
Adaptive Query Execution (AQE)
# AQE (Spark 3.0+) — dynamically optimizes query plans at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true") # on by default in 3.2+
# AQE features:
# 1. Coalesce shuffle partitions: reduces 200 -> fewer if data is small
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
# 2. Convert sort-merge join to broadcast join at runtime
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
# 3. Skew join handling (see Joins section)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Caching & Persistence
Storage Levels
| Level | Memory | Disk | Serialized | Use when |
|---|---|---|---|---|
MEMORY_ONLY | Yes | No | No (JVM objects) | Small dataset, fast JVM; recompute if evicted |
MEMORY_AND_DISK | Yes | Overflow | No/Yes | Prefer memory, spill to disk if needed — good default |
MEMORY_ONLY_SER | Yes | No | Yes (Kryo) | More space-efficient, slower to read |
MEMORY_AND_DISK_SER | Yes | Overflow | Yes | Large datasets, worth serialization overhead |
DISK_ONLY | No | Yes | Yes | Very large data, recompute is expensive |
OFF_HEAP | Off-heap | No | Yes | Avoid GC pressure; requires spark.memory.offHeap.enabled |
from pyspark import StorageLevel
# cache() = MEMORY_AND_DISK for DataFrames/Datasets; MEMORY_ONLY for RDDs
df_hot.cache()
# persist() with explicit level
df_hot.persist(StorageLevel.MEMORY_AND_DISK)
df_hot.persist(StorageLevel.MEMORY_ONLY_SER)
df_hot.persist(StorageLevel.DISK_ONLY)
# Cache is lazy — call an action to materialize
df_hot.count() # triggers actual caching
# Check if cached
print(df_hot.is_cached) # DataFrame
print(rdd.is_cached) # RDD
# Release cache
df_hot.unpersist() # blocks until complete
df_hot.unpersist(blocking=True)
import org.apache.spark.storage.StorageLevel
df.cache() // MEMORY_AND_DISK
df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY_SER)
// Materialize
df.count()
// Release
df.unpersist()
df.unpersist(blocking = true)
When to Cache
- DataFrames accessed multiple times in the same application (ML feature sets, reference tables)
- Result of an expensive computation (complex joins, aggregations) reused downstream
- Iterative algorithms (ML training) where the same base data is scanned each iteration
unpersist() when done. Avoid caching very wide DataFrames — cache at the final column selection.
Checkpoint
# Checkpoint — materializes to disk AND breaks the lineage graph
# Useful when lineage grows too long (iterative algorithms, streaming)
sc.setCheckpointDir("hdfs:///checkpoints/")
df_processed = df.filter(...).join(...).groupBy(...)
df_processed = df_processed.checkpoint() # must capture return value! Truncates lineage.
# Eager vs lazy checkpoint
df_processed = df_processed.checkpoint(eager=True) # triggers action immediately (default)
df_processed.checkpoint(eager=False) # lazy — materializes on next action
# Local checkpoint — faster but less reliable (no HDFS replication)
df_processed.localCheckpoint()
Structured Streaming
Structured Streaming treats a live data stream as an unbounded table that grows over time. You write the same DataFrame API queries, and Spark runs them incrementally.
Reading a Stream
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType
# From Kafka
kafka_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest") # or "latest"
.option("maxOffsetsPerTrigger", 100000) # rate limit
.load())
# Kafka gives: key, value, topic, partition, offset, timestamp, timestampType
# Decode the value as JSON
schema = StructType([
StructField("user_id", LongType(), True),
StructField("event_type", StringType(), True),
StructField("ts", TimestampType(), True),
])
events_df = kafka_df.select(
F.from_json(col("value").cast("string"), schema).alias("data"),
col("timestamp").alias("kafka_ts")
).select("data.*", "kafka_ts")
# From files (continuous directory scan)
file_df = (spark.readStream
.format("parquet")
.schema(schema)
.option("path", "s3://bucket/landing/")
.load())
# From socket (dev/testing only)
socket_df = (spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
Writing a Stream
# Output modes:
# append — only new rows (for stateless queries or with watermark)
# complete — entire result table (for aggregations without watermark)
# update — only changed rows
# Write to console (dev)
query = (events_df.writeStream
.format("console")
.outputMode("append")
.trigger(processingTime="10 seconds") # micro-batch every 10s
.start())
# Write to Parquet (files)
query = (events_df.writeStream
.format("parquet")
.outputMode("append")
.option("path", "s3://bucket/events-processed/")
.option("checkpointLocation", "s3://bucket/checkpoints/events/")
.partitionBy("year", "month", "day")
.trigger(processingTime="1 minute")
.start())
# Write to Kafka
query = (result_df.selectExpr("CAST(user_id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("topic", "output-events")
.option("checkpointLocation", "s3://bucket/checkpoints/output/")
.outputMode("append")
.start())
# Trigger modes
.trigger(processingTime="30 seconds") # fixed interval micro-batch
.trigger(once=True) # process all available data, then stop
.trigger(availableNow=True) # like once=True but respects maxFilesPerTrigger
.trigger(continuous="1 second") # experimental — truly continuous (low latency)
# Wait for termination
query.awaitTermination()
query.stop()
# Monitor
print(query.status)
print(query.lastProgress)
query.explain()
Watermarks & Late Data
# Watermark tells Spark how late data can arrive before it's dropped
# Required for stateful aggregations with append output mode
from pyspark.sql import functions as F
windowed = (events_df
.withWatermark("ts", "10 minutes") # tolerate 10 min late data
.groupBy(
F.window("ts", "5 minutes"), # 5-min tumbling window
col("event_type")
)
.count())
# Sliding windows
sliding = (events_df
.withWatermark("ts", "10 minutes")
.groupBy(
F.window("ts", "10 minutes", "5 minutes"), # 10-min window, 5-min slide
col("event_type")
)
.count())
# Session windows (Spark 3.2+)
session = (events_df
.withWatermark("ts", "10 minutes")
.groupBy(
F.session_window("ts", "5 minutes"), # gap timeout = 5 min
col("user_id")
)
.count())
foreachBatch — Custom Sink Logic
def process_batch(batch_df, batch_id):
"""Called for each micro-batch. batch_df is a regular DataFrame."""
# Deduplicate
batch_df = batch_df.dropDuplicates(["event_id"])
# Write to multiple destinations
batch_df.write.mode("append").parquet("s3://bucket/raw/")
batch_df.filter(col("priority") == "high") \
.write.format("delta").mode("append").save("s3://bucket/high-priority/")
# Call external API, write to DB, etc.
batch_df.toPandas().to_sql("events", engine, if_exists="append")
query = (events_df.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", "s3://bucket/checkpoints/")
.trigger(processingTime="30 seconds")
.start())
Performance Tuning
General Principles
- Reduce shuffle data — the single biggest lever. Filter before joining. Use
reduceByKeyovergroupByKey. Use broadcast joins for small tables. - Choose the right file format — Parquet/ORC for analytics. Columnar formats allow column pruning and predicate pushdown. Avoid CSV/JSON in production pipelines.
- Right-size partitions — target 128–256 MB per partition. Too few = under-parallelism. Too many = scheduler overhead and small files.
- Use built-in functions over UDFs — stay in the JVM / Tungsten execution engine.
- Profile before optimizing — use the Spark UI (port 4040). Look at stage timelines, shuffle read/write, and GC time.
Executor Sizing
executor_memory = (node_memory - OS overhead) / executors_per_node. Reserve ~10% overhead for Spark's off-heap.
# YARN cluster: 10 nodes, each 64 GB RAM + 16 cores
# Leave 1 core for OS, 4 GB for OS = 15 cores, 60 GB per node
# 3 executors per node × 5 cores = 15 cores/node (keeps 1 for AM)
# Memory per executor: 60 / 3 = 20 GB → allocate 18 GB + 2 GB overhead
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 30 \
--executor-cores 5 \
--executor-memory 18g \
--driver-memory 4g \
--conf spark.yarn.executor.memoryOverhead=2g \
my_job.py
Serialization — Kryo
# Default Java serialization is slow. Kryo is 2-10x faster.
# Important for RDD operations and UDFs, less so for DataFrames (Tungsten handles it)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false") # log warnings, not errors
# In Scala — register your classes for best Kryo performance
// Register case classes with Kryo
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(
classOf[Employee],
classOf[Order]
))
Data Skew Solutions Summary
| Technique | When to use | Trade-off |
|---|---|---|
| Broadcast join | One side < 50–100 MB | Driver OOM risk for very large small side |
| Salting | Skewed join key (hot keys) | Must explode dimension side; extra disk/CPU |
| AQE skew join | Spark 3+ with AQE enabled | Auto-detected; may not catch all skew |
| Repartition | Uneven input partitions | Causes a full shuffle |
| Filter skewed keys | Hot keys are NULLs or garbage | Data loss if not careful |
Small File Problem
# Problem: streaming jobs or many small partitioned writes create thousands of tiny files
# Reading 10,000 × 1 MB files is much slower than reading 100 × 100 MB files
# (HDFS NameNode pressure, S3 LIST API calls, per-file overhead)
# Solution 1: coalesce before write
df.coalesce(50).write.parquet("s3://bucket/output/")
# Solution 2: repartition by partition columns to get one file per partition value
df.repartition("year", "month").write.partitionBy("year", "month").parquet("output/")
# Solution 3: Spark SQL maxRecordsPerFile
df.write \
.option("maxRecordsPerFile", 1000000) \
.parquet("output/")
# Solution 4: Delta OPTIMIZE command (Delta Lake)
spark.sql("OPTIMIZE delta.`s3://bucket/delta/events/` ZORDER BY (user_id, ts)")
# Solution 5: Auto-compaction in streaming
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", "...") \
.option("delta.autoOptimize.autoCompact", "true") \
.start("s3://bucket/delta/events/")
AQE — Adaptive Query Execution
# AQE is the most impactful Spark 3.x performance feature
# It re-optimizes physical plans at runtime based on actual shuffle statistics
spark.conf.set("spark.sql.adaptive.enabled", "true") # default true in Spark 3.2+
# 1. Dynamic partition coalescing: merges small shuffle partitions after the shuffle
# Avoids 200 tiny tasks when shuffle output is small
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
# 2. Dynamic join strategy switching: if statistics show one side is small,
# AQE downgrades sort-merge to broadcast join
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
# 3. Skew join optimization
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
Production Deployment
Deployment Platforms Comparison
| Platform | Cluster Manager | Best For | Trade-off |
|---|---|---|---|
| EMR | YARN | AWS-native data teams, S3-heavy pipelines | Vendor lock-in, cluster spin-up time (~5-10 min) |
| Databricks | Proprietary | Fastest setup, notebooks + Unity Catalog, Delta Lake | Most expensive, less infra control |
| Kubernetes | K8s native | Multi-cloud, existing K8s infra, containerized workloads | More complex ops, tuning pod resources |
| Dataproc | YARN | GCP-native, GCS integration | GCP lock-in |
| YARN (on-prem) | YARN | Existing Hadoop clusters | Hardware management overhead |
| Standalone | Built-in | Dev/test, simple single-team clusters | No resource sharing, limited scheduling |
Complete Example: S3 Aggregation Job
A dead-simple end-to-end Spark app: read orders from S3, compute revenue per product, write results back to S3 as Parquet.
# etl_job.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("S3 Revenue Aggregation")
.getOrCreate())
# Read — CSV with header from S3
orders = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("s3a://my-bucket/raw/orders/"))
# Transform — revenue per product
revenue = (orders
.groupBy("product_id")
.agg(
F.sum("amount").alias("total_revenue"),
F.count("*").alias("order_count"),
F.avg("amount").alias("avg_order_value"),
))
# Write — Parquet back to S3
(revenue.write
.mode("overwrite")
.parquet("s3a://my-bucket/output/revenue_by_product/"))
spark.stop()
# Submit:
# spark-submit --master yarn --deploy-mode cluster etl_job.py
// src/main/scala/com/example/EtlJob.scala
package com.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("S3 Revenue Aggregation")
.getOrCreate()
// Read — CSV with header from S3
val orders = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("s3a://my-bucket/raw/orders/")
// Transform — revenue per product
val revenue = orders
.groupBy("product_id")
.agg(
sum("amount").as("total_revenue"),
count("*").as("order_count"),
avg("amount").as("avg_order_value")
)
// Write — Parquet back to S3
revenue.write
.mode("overwrite")
.parquet("s3a://my-bucket/output/revenue_by_product/")
spark.stop()
}
}
// build.sbt:
// name := "spark-etl"
// version := "1.0"
// scalaVersion := "2.12.18"
// libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided"
//
// Build: sbt assembly
// Submit: spark-submit --class com.example.EtlJob target/scala-2.12/spark-etl-assembly-1.0.jar
s3a:// (not s3:// or s3n://) — it's the modern, production connector. Credentials come from IAM roles (EMR/Glue/Databricks), environment variables (AWS_ACCESS_KEY_ID), or spark.hadoop.fs.s3a.* configs. Never hardcode keys in your code.
Packaging a Spark Application
provided scope).
Python (PySpark)
# Option 1: Single script (simplest)
spark-submit my_job.py
# Option 2: Multiple modules — zip them
cd my_project/
zip -r ../my_project.zip . # zip the package directory
spark-submit --py-files my_project.zip main.py
# Option 3: Third-party deps — ship a venv or use --packages
pip install -t deps/ requests pandas
cd deps/ && zip -r ../deps.zip . && cd ..
spark-submit --py-files deps.zip,my_project.zip main.py
# Option 4: Conda/venv environment archive (EMR/Databricks)
# Pack the whole environment for reproducibility
conda pack -o pyspark_env.tar.gz
spark-submit \
--conf spark.pyspark.python=./environment/bin/python \
--archives pyspark_env.tar.gz#environment \
main.py
Java / Scala
# Build an uber-jar (fat jar) with all deps except Spark itself
# Maven: use shade plugin Gradle: use shadow plugin sbt: use assembly
# Maven pom.xml key snippet:
#
# org.apache.spark
# spark-sql_2.12
# provided
#
mvn clean package -DskipTests
# produces target/my-spark-app-1.0-shaded.jar
spark-submit \
--class com.example.MyApp \
--master yarn \
target/my-spark-app-1.0-shaded.jar arg1 arg2
spark-submit Deep Dive
# Full spark-submit anatomy
spark-submit \
--master yarn \ # cluster manager: yarn | k8s://... | spark://... | local[*]
--deploy-mode cluster \ # cluster = driver on worker; client = driver on submitting machine
--name "daily-etl" \ # app name visible in Spark UI / YARN RM
--queue production \ # YARN queue (resource isolation)
--num-executors 20 \ # fixed number (or use dynamic allocation)
--executor-cores 5 \
--executor-memory 16g \
--driver-memory 4g \
--driver-cores 2 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=5 \
--conf spark.dynamicAllocation.maxExecutors=50 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--packages org.apache.spark:spark-avro_2.12:3.5.0 \ # Maven coords — downloaded at runtime
--jars /path/to/custom.jar \ # local JARs to ship
--py-files deps.zip \ # Python: extra .zip/.py/.egg
--files config.yaml \ # files accessible via SparkFiles.get("config.yaml")
my_job.py # or my-app.jar --class com.example.Main
cluster — driver runs inside the cluster (YARN ApplicationMaster / K8s pod). Use this for scheduled production jobs. Logs accessible via
yarn logs -applicationId <id> or K8s pod logs.
EMR (AWS)
# Create a transient EMR cluster that runs one step then terminates
aws emr create-cluster \
--name "nightly-etl" \
--release-label emr-7.0.0 \
--applications Name=Spark \
--instance-groups '[
{"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","InstanceCount":1},
{"InstanceGroupType":"CORE","InstanceType":"m5.2xlarge","InstanceCount":5}
]' \
--steps '[{
"Type":"Spark",
"Name":"ETL",
"ActionOnFailure":"TERMINATE_CLUSTER",
"Args":["--deploy-mode","cluster","--py-files","s3://my-bucket/deps.zip","s3://my-bucket/jobs/etl.py"]
}]' \
--auto-terminate \
--log-uri s3://my-bucket/emr-logs/ \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-abc123
# In your PySpark code — read/write directly to S3
df = spark.read.parquet("s3://my-bucket/raw/events/")
df.write.mode("overwrite").parquet("s3://my-bucket/processed/events/")
# EMR pre-configures the S3 committer for fast, safe writes
# No need to set fs.s3a.* — EMRFS handles it
Kubernetes
# Native Spark on K8s (Spark 3.1+) — no Hadoop/YARN needed
spark-submit \
--master k8s://https://k8s-api-server:6443 \
--deploy-mode cluster \
--name spark-etl \
--conf spark.kubernetes.container.image=my-registry/spark:3.5.0 \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.driver.request.cores=2 \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.kubernetes.executor.request.cores=4 \
--conf spark.kubernetes.executor.limit.cores=4 \
--conf spark.executor.instances=10 \
--conf spark.executor.memory=8g \
--conf spark.kubernetes.file.upload.path=s3a://my-bucket/spark-uploads/ \
local:///opt/spark/work-dir/my_job.py # path INSIDE the container image
# Build a custom Spark image with your code + deps baked in
# Dockerfile
FROM apache/spark:3.5.0
COPY my_project/ /opt/spark/work-dir/
COPY requirements.txt /opt/spark/work-dir/
RUN pip install -r /opt/spark/work-dir/requirements.txt
# Build and push
docker build -t my-registry/spark-etl:v1 .
docker push my-registry/spark-etl:v1
SparkApplication CRD — define jobs declaratively in YAML, get automatic retries, monitoring, and lifecycle management instead of raw spark-submit.
# SparkApplication CRD example
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: daily-etl
namespace: spark-jobs
spec:
type: Python
mode: cluster
image: my-registry/spark-etl:v1
mainApplicationFile: local:///opt/spark/work-dir/etl.py
sparkVersion: "3.5.0"
driver:
cores: 2
memory: "4g"
serviceAccount: spark
executor:
cores: 4
instances: 10
memory: "8g"
restartPolicy:
type: OnFailure
onFailureRetries: 3
Databricks
# Databricks CLI — create and run a job
# Install: pip install databricks-cli
# Configure
databricks configure --token # enter host + personal access token
# Deploy a wheel/jar to DBFS
databricks fs cp my_project-0.1-py3-none-any.whl dbfs:/libs/
# Create a job definition (JSON)
databricks jobs create --json '{
"name": "daily-etl",
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 5,
"aws_attributes": {"availability": "SPOT_WITH_FALLBACK"}
},
"libraries": [{"whl": "dbfs:/libs/my_project-0.1-py3-none-any.whl"}],
"spark_python_task": {
"python_file": "dbfs:/jobs/etl.py",
"parameters": ["--date", "2024-01-01"]
}
}'
# Trigger a run
databricks jobs run-now --job-id 12345
Orchestrating Spark Jobs
# Airflow — the most common Spark job scheduler
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
# Option 1: SparkSubmitOperator (for YARN / K8s / standalone)
etl_task = SparkSubmitOperator(
task_id="run_etl",
application="/opt/jobs/etl.py",
conn_id="spark_default",
conf={"spark.executor.memory": "8g", "spark.executor.cores": "4"},
num_executors=10,
py_files="/opt/jobs/deps.zip",
)
# Option 2: EMR step via Airflow
emr_step = EmrAddStepsOperator(
task_id="add_etl_step",
job_flow_id="{{ var.value.emr_cluster_id }}",
steps=[{
"Name": "ETL",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["spark-submit", "--deploy-mode", "cluster",
"s3://bucket/jobs/etl.py"]
}
}],
)
# Option 3: Databricks via Airflow
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
db_task = DatabricksRunNowOperator(
task_id="run_databricks_etl",
job_id=12345,
notebook_params={"date": "{{ ds }}"},
)
CI/CD for Spark Jobs
# GitHub Actions example: test → build → deploy
name: spark-etl-pipeline
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: { python-version: "3.11" }
- run: |
pip install pyspark==3.5.0 pytest
pip install -r requirements.txt
pytest tests/ -v # unit tests with local Spark
deploy:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: |
# Package
cd my_project && zip -r ../my_project.zip . && cd ..
pip install -t deps/ -r requirements.txt
cd deps && zip -r ../deps.zip . && cd ..
# Upload to S3
aws s3 cp my_project.zip s3://my-bucket/jobs/
aws s3 cp deps.zip s3://my-bucket/jobs/
aws s3 cp main.py s3://my-bucket/jobs/
Production Readiness Checklist
| Category | Check |
|---|---|
| Idempotency | Job can re-run safely — use mode("overwrite") with partitioned output or Delta Lake MERGE |
| Logging | Log to stdout (captured by YARN/K8s). Use log4j config to suppress noisy Spark logs |
| Monitoring | Spark UI (port 4040), History Server for completed jobs, Ganglia/CloudWatch/Prometheus for cluster metrics |
| Retries | Orchestrator handles retries (Airflow retries=2). Spark retries failed tasks internally (spark.task.maxFailures=4) |
| Data validation | Assert row counts, null percentages, and schema after writes — Great Expectations or custom checks |
| Dependency pinning | Pin Spark version + all library versions in requirements.txt / pom.xml. Test upgrades in staging first |
| Secrets | Never hardcode credentials. Use IAM roles (EMR), K8s secrets, or Databricks secret scopes |
| Cluster sizing | Start small, profile with Spark UI, then scale. Enable dynamic allocation for variable workloads |
Configuration Reference
Key Configuration Properties
| Property | Default | Notes |
|---|---|---|
spark.executor.memory | 1g | Executor JVM heap. Typically 4–18g in prod. |
spark.executor.cores | 1 (local: all) | Cores per executor. Usually 4–5 in prod. |
spark.executor.instances | 2 | Number of executors. Use dynamic allocation in prod. |
spark.driver.memory | 1g | Driver JVM heap. Increase if collecting large results. |
spark.driver.maxResultSize | 1g | Max size of results returned to driver. Set 0 for unlimited. |
spark.sql.shuffle.partitions | 200 | Partitions after a shuffle (DataFrame API). Tune to ~128–256 MB / partition. |
spark.default.parallelism | Total executor cores | Default partitions for RDD operations. |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Tables smaller than this are auto-broadcast. Set -1 to disable. |
spark.sql.adaptive.enabled | true (3.2+) | Enable Adaptive Query Execution. |
spark.serializer | JavaSerializer | Set to KryoSerializer for RDD-heavy workloads. |
spark.memory.fraction | 0.6 | Fraction of heap for execution + storage. Rest is user memory. |
spark.memory.storageFraction | 0.5 | Fraction of spark.memory.fraction reserved for cache storage. |
spark.network.timeout | 120s | Increase if executors frequently timeout on slow operations. |
spark.sql.broadcastTimeout | 300s | Timeout for broadcast variable collection. Increase for large broadcasts. |
spark.sql.files.maxPartitionBytes | 128m | Max bytes per partition when reading files. |
spark.yarn.executor.memoryOverhead | executorMemory * 0.1, min 384m | Non-heap memory (Python process, off-heap). Increase for PySpark. |
spark.dynamicAllocation.enabled | false | Auto-scale executors based on workload. |
spark.dynamicAllocation.minExecutors | 0 | Min executors when idle. |
spark.dynamicAllocation.maxExecutors | infinity | Cap executor count. |
spark.sql.parquet.compression.codec | zstd (3.4+; snappy before) | Options: snappy, gzip, lz4, zstd. zstd is default since Spark 3.4. |
Setting Configuration
# At SparkSession creation (preferred)
spark = (SparkSession.builder
.appName("MyApp")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.executor.memory", "8g")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.maxExecutors", "50")
.getOrCreate())
# At runtime (some properties are immutable after session start)
spark.conf.set("spark.sql.shuffle.partitions", "100")
# Read a config value
print(spark.conf.get("spark.sql.shuffle.partitions"))
# Via spark-submit
# spark-submit --conf spark.executor.memory=8g --conf spark.sql.shuffle.partitions=200 job.py
# Via spark-defaults.conf (cluster-wide defaults)
# $SPARK_HOME/conf/spark-defaults.conf:
# spark.executor.memory 8g
# spark.sql.shuffle.partitions 200
Dynamic Allocation
# Dynamic allocation automatically adds/removes executors based on pending tasks
# Requires shuffle service (for YARN/Standalone) or K8s
spark = SparkSession.builder \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.initialExecutors", "5") \
.config("spark.dynamicAllocation.maxExecutors", "100") \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.config("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") \
.config("spark.shuffle.service.enabled", "true") \
.getOrCreate()
Common Pitfalls
1. Driver OOM from collect() on Large Data
df.collect() and df.toPandas() pull ALL data to the driver. A 50 GB DataFrame kills a 4 GB driver.
# BAD
all_rows = df.collect() # 50 GB DataFrame → driver OOM
# GOOD — write to storage and process offline
df.write.parquet("s3://bucket/output/")
# GOOD — sample for exploration
sample = df.sample(0.001).toPandas()
# GOOD — aggregate first, then collect the summary
summary = df.groupBy("dept").agg(F.sum("salary")).collect() # tiny result
2. Data Skew — One Task Takes Forever
# Symptom: Stage has 199 tasks completing in 10s, 1 task running for 10 minutes
# Diagnosis
df.groupBy("join_key").count().orderBy(F.desc("count")).show(10)
# Fix: see Joins section — broadcast, salting, or AQE skew join
3. Too Many Small Files
# Symptom: Reading job is slow even though data is small
# Cause: Streaming or partitioned writes produced millions of tiny files
# Diagnosis
# Check number of files in a path:
# aws s3 ls --recursive s3://bucket/path/ | wc -l
# Fix: coalesce before write, or OPTIMIZE (Delta Lake)
df.coalesce(20).write.parquet("output/")
4. UDF Performance
# BAD — Python UDF: row-by-row Python serialization overhead
@udf("string")
def format_name(first, last):
return f"{last}, {first}".upper()
df.withColumn("formatted", format_name("first", "last"))
# GOOD — use built-in functions (no Python overhead)
from pyspark.sql.functions import concat_ws, upper
df.withColumn("formatted", upper(concat_ws(", ", col("last"), col("first"))))
# GOOD — if you must use Python logic, use Pandas UDF (vectorized)
@pandas_udf("string")
def format_name_pandas(first: pd.Series, last: pd.Series) -> pd.Series:
return (last + ", " + first).str.upper()
df.withColumn("formatted", format_name_pandas("first", "last"))
5. Accidental Cartesian Join
# BAD — cross join with 1M × 1M rows = 1 trillion rows
df1.join(df2) # no join key! Spark 2.x silently does cartesian product
# Spark 3.x throws AnalysisException if cross join is not explicit — use .crossJoin()
# Always provide a join key
df1.join(df2, "user_id", "inner") # explicit key
6. Caching the Wrong Dataset
# BAD — caching a wide DataFrame then dropping most columns
df_wide = spark.read.parquet("huge_table/")
df_wide.cache() # caches ALL 200 columns
result = df_wide.select("id", "name", "salary")
# GOOD — cache after projection
df_narrow = spark.read.parquet("huge_table/").select("id", "name", "salary")
df_narrow.cache()
df_narrow.count() # materialize
# BAD — forgetting to unpersist
df_temp.cache()
# ... do work ...
# forgot to call df_temp.unpersist() — memory leak
# GOOD — always unpersist when done
df_temp.cache()
df_temp.count()
try:
process(df_temp)
finally:
df_temp.unpersist()
7. Python vs JVM Serialization Overhead
# PySpark DataFrames are fast — processing stays in JVM/Tungsten
# Python overhead only occurs at these boundaries:
# 1. Python UDFs
# 2. .collect() / .toPandas()
# 3. RDD operations with Python lambdas
# 4. foreachBatch with Python code
# Minimize crossings — do as much as possible with DataFrame API
# before touching Python-specific code
# Example: avoid RDD if DataFrame API works
# BAD
df.rdd.map(lambda row: row["salary"] * 1.1) # crosses JVM/Python boundary
# GOOD
df.withColumn("salary", col("salary") * 1.1) # stays in JVM Tungsten
8. Wrong shuffle.partitions Setting
# Default: spark.sql.shuffle.partitions = 200
# This is TOO HIGH for small data (creates tiny partitions and overhead)
# and TOO LOW for large data (creates overloaded partitions that spill to disk)
# Rule: target 128–256 MB per partition after shuffle
# Small dataset (dev/test): set low
spark.conf.set("spark.sql.shuffle.partitions", "8")
# Large dataset (1 TB): partition count = total data / target size
# 1TB / 200MB = 5000 partitions
spark.conf.set("spark.sql.shuffle.partitions", "5000")
# OR — enable AQE to auto-coalesce
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE will merge small partitions automatically
9. Mistaking Lazy Evaluation for Execution
# BAD — assuming transformations execute immediately
df = spark.read.parquet("huge_file.parquet")
df = df.filter(col("year") == 2024) # nothing executes here
df = df.groupBy("dept").count() # still nothing
# ... application exits before any action is called!
# GOOD — an action must be called to trigger execution
result = df.filter(col("year") == 2024).groupBy("dept").count()
result.show() # triggers the actual job
result.write.parquet("output/") # also triggers execution
# Common mistake in scripts: forgetting to call an action on the final result
# If your script seems to do nothing, check that you're calling an action.
10. groupByKey on String Keys — Use Hashing Wisely
# groupByKey collects ALL values for a key to one partition
# On high-cardinality string keys this can cause OOM
pairs = df.rdd.map(lambda row: (row["user_id"], row["event"]))
pairs.groupByKey() # dangerous — shuffles all events per user to one node
# reduceByKey does partial aggregation (combine) before shuffle
pairs = df.rdd.map(lambda row: (row["user_id"], 1))
pairs.reduceByKey(lambda a, b: a + b) # safer — count events per user
# Even better — use DataFrame API (Catalyst optimizes it fully)
df.groupBy("user_id").count()