Table of Contents

Setup & Environment

Learn by experimenting
Distributed systems are best understood through experimentation. Docker lets you simulate network partitions, node failures, and replication lag locally without any cloud account. The setups below give you hands-on playgrounds for the concepts in every section.

3-Node etcd Cluster (Consensus Playground)

etcd is the backing store for Kubernetes and a canonical Raft implementation. Spin up three nodes locally to watch leader election and log replication in action.

# docker-compose.yml  — 3-node etcd cluster
version: "3.8"

services:
  etcd0:
    image: quay.io/coreos/etcd:v3.5.9
    environment:
      ETCD_NAME: etcd0
      ETCD_DATA_DIR: /etcd-data
      ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
      ETCD_ADVERTISE_CLIENT_URLS: http://etcd0:2379
      ETCD_LISTEN_PEER_URLS: http://0.0.0.0:2380
      ETCD_INITIAL_ADVERTISE_PEER_URLS: http://etcd0:2380
      ETCD_INITIAL_CLUSTER: etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380
      ETCD_INITIAL_CLUSTER_STATE: new
    ports: ["2379:2379"]

  etcd1:
    image: quay.io/coreos/etcd:v3.5.9
    environment:
      ETCD_NAME: etcd1
      ETCD_DATA_DIR: /etcd-data
      ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
      ETCD_ADVERTISE_CLIENT_URLS: http://etcd1:2379
      ETCD_LISTEN_PEER_URLS: http://0.0.0.0:2380
      ETCD_INITIAL_ADVERTISE_PEER_URLS: http://etcd1:2380
      ETCD_INITIAL_CLUSTER: etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380
      ETCD_INITIAL_CLUSTER_STATE: new

  etcd2:
    image: quay.io/coreos/etcd:v3.5.9
    environment:
      ETCD_NAME: etcd2
      ETCD_DATA_DIR: /etcd-data
      ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
      ETCD_ADVERTISE_CLIENT_URLS: http://etcd2:2379
      ETCD_LISTEN_PEER_URLS: http://0.0.0.0:2380
      ETCD_INITIAL_ADVERTISE_PEER_URLS: http://etcd2:2380
      ETCD_INITIAL_CLUSTER: etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380
      ETCD_INITIAL_CLUSTER_STATE: new
# Start the cluster
docker compose up -d

# Watch leader election
etcdctl --endpoints=http://localhost:2379 endpoint status --write-out=table

# Put/get a key — observe that any node serves reads
etcdctl --endpoints=http://localhost:2379 put /config/feature_flag "enabled"
etcdctl --endpoints=http://localhost:2379 get /config/feature_flag

# Simulate leader failure — kill the leader container and watch re-election
docker compose stop etcd0
etcdctl --endpoints=http://localhost:2379 endpoint status --write-out=table

Tooling

# CLI tools for hands-on practice
brew install etcd          # etcdctl for consensus experiments
brew install redis         # redis-cli for replication / cache demos
brew install kafka         # kafka-topics, kafka-console-producer/consumer

# Python clients
pip install grpcio redis kafka-python requests

# Chaos engineering (simulate partitions)
# Toxiproxy — inject latency/drops between containers
docker pull shopify/toxiproxy

Why Distributed Systems

Single-Machine Limits

Every computer has a ceiling. At some point you hit one of these walls:

Horizontal vs Vertical Scaling

DimensionVertical (Scale Up)Horizontal (Scale Out)
ApproachBigger machine (more RAM, cores)More machines
CostSuperlinear — doubling specs costs 4-8xLinear — commodity hardware
DowntimeUsually requires rebootRolling updates, no downtime
CeilingHard limit of largest available instanceEffectively unlimited
ComplexityLow — single processHigh — distributed coordination needed
When to useStateful, hard-to-partition workloadsStateless services, large data volumes
Practical rule of thumb
Always scale vertically first. Distributed systems add enormous operational complexity. Only reach for horizontal scaling when you have a concrete, measured reason — not because you anticipate future growth.

The 8 Fallacies of Distributed Computing

Peter Deutsch (Sun Microsystems, 1994) identified eight assumptions developers incorrectly make about networks. Every one of these will eventually bite you in production:

  1. The network is reliable. Packets are dropped. NICs fail. Cables are unplugged.
  2. Latency is zero. A local function call takes nanoseconds. A network call takes milliseconds — 6 orders of magnitude slower.
  3. Bandwidth is infinite. Serializing a 1 GB object and sending it over a 1 Gbps link takes 8 seconds.
  4. The network is secure. Data traverses shared infrastructure. TLS, mTLS, and zero-trust are non-optional.
  5. Topology doesn't change. Load balancers reconfigure, containers restart, IPs change. Hard-coding addresses breaks things.
  6. There is one administrator. Multiple teams own different services. Coordination and contracts (APIs) matter.
  7. Transport cost is zero. Serialization (JSON, Protobuf), compression, and network I/O have real CPU and time costs.
  8. The network is homogeneous. Different services may run on different OS versions, hardware, and network segments with different MTUs.

CAP Theorem & PACELC

CAP

Eric Brewer's CAP theorem (2000, proved by Gilbert & Lynch 2002) states that a distributed system can guarantee at most two of the three following properties simultaneously:

Partition tolerance is not optional
Networks do partition — this is a fact of production engineering, not a hypothetical. So in practice the real choice is: when a partition occurs, do you sacrifice Consistency (return possibly stale data) or Availability (return an error)?
# ASCII: the CAP trade-off during a network partition
#
#   Node A  ----X----  Node B   (partition: messages dropped)
#
#   CP choice: refuse to serve reads/writes until partition heals
#              (correct data, but some requests fail)
#
#   AP choice: serve reads from local state, accept writes locally
#              (system stays up, but nodes may diverge)

Real-World Examples

SystemDefault behaviorRationale
etcd / ZooKeeperCPLeader-based; refuses writes without quorum. Used for config/coordination where correctness is paramount.
Postgres (single primary)CPSynchronous standbys block on partition; async standbys can diverge (AP-like).
Cassandra / DynamoDBAP (tunable)Leaderless; always accepts writes, reconciles later. Tunable consistency per operation.
Redis SentinelCP (weak)Promotes new primary on failure but may lose recent writes during promotion window.
Kafka (producer acks=all)CPWaits for ISR to acknowledge before confirming write.

PACELC Extension

Daniel Abadi (2010) extended CAP with PACELC: even when there is no partition, you still must trade off Latency (L) against Consistency (C). Stronger consistency requires coordination — coordination takes time.

Spanner achieves PC/EC globally via GPS + atomic clocks (TrueTime) — consistency without sacrificing much latency, but only by spending heavily on infrastructure.

Consistency Models

Consistency models define the contract between a distributed system and its clients: what can a client observe after a write? Ordered from strongest to weakest:

Strong Consistency (Linearizability)

Linearizability is the gold standard. Every operation appears to take effect instantaneously at some single point in time, and all clients observe a single, consistent global order. Once a write completes, all subsequent reads — from any client, any node — see that write.

Sequential Consistency

Weaker than linearizability. Operations appear in a consistent order across all processes, but that order does not have to match real wall-clock time. All clients see the same sequence, but it may lag reality.

Causal Consistency

If operation A causally precedes operation B (A happened before B, and B observed A's effects), then all nodes see A before B. Operations with no causal relationship may be seen in any order.

Eventual Consistency

If no new writes are issued, all replicas will eventually converge to the same value. No guarantee on when, no guarantee on ordering.

Session Guarantees

Practical consistency properties for a single client session:

Linearizability vs Serializability

These are often confused in interviews:

PropertyLinearizabilitySerializability
DomainSingle operations (reads/writes)Transactions (multiple ops)
GuaranteeEach op appears atomic at a point in timeTransaction outcome = some serial order
Respects real time?YesNo (serial order may not match wall clock)
Both togetherCalled "strict serializability" — the strongest practical guarantee. Used by Spanner, FoundationDB.
What does your user see after a write?
Ask this question when choosing a consistency model. For most web apps: read-your-writes + eventual consistency is sufficient and far cheaper than linearizability. For financial transactions: you want serializable (or strict serializable) isolation.

Replication

Replication keeps copies of data on multiple nodes for fault tolerance (survive node failures) and read scalability (spread read load). Three main topologies:

Replication Topologies

# Single-leader (most common)
#
#   Client Writes
#        |
#    [Leader] ──replication──> [Follower 1]
#        |                     [Follower 2]
#        └──────────────────>  [Follower 3]
#
#   All writes go to the leader.
#   Followers replicate the leader's write-ahead log.
#   Reads can be served by followers (with possible staleness).

# Multi-leader (active-active)
#
#   [DC East: Leader A] <──────────> [DC West: Leader B]
#       |                                  |
#   [Follower]                         [Follower]
#
#   Each DC accepts writes. Conflicts resolved on merge.
#   Used for: multi-region writes, offline clients.

# Leaderless (Dynamo-style)
#
#   Client ──W──> [Node 1]  (W = write quorum, e.g., 2 of 3)
#          ──W──> [Node 2]
#          ──W──> [Node 3]
#
#   No designated leader. Client writes to W nodes, reads from R nodes.
#   Consistency: W + R > N  (N = total replicas)

Synchronous vs Asynchronous Replication

SynchronousAsynchronous
Write latencyHigh — waits for all replicas to ackLow — returns after leader writes
DurabilityNo data loss on leader failurePossible data loss (replication lag)
AvailabilityLower — one slow replica blocks writesHigher — replica failures don't block
Use whenFinancial data, audit logsUser activity feeds, analytics events

Replication Lag

Async replication means followers can be seconds (or minutes) behind the leader. Effects:

Solutions: sticky sessions (always read from same replica), read from primary for sensitive operations, use causal consistency tokens.

Conflict Resolution

Practical: Postgres & Redis Replication

# Postgres streaming replication setup
# In postgresql.conf on primary:
wal_level = replica
max_wal_senders = 5
wal_keep_size = 256MB   # keep 256MB of WAL for replicas

# In pg_hba.conf on primary (allow replica user):
# host  replication  replicator  replica_ip/32  md5

# On replica — pg_basebackup then recovery.conf (PG < 12) or
# standby.signal + postgresql.conf (PG >= 12):
primary_conninfo = 'host=primary_ip port=5432 user=replicator'
# touch standby.signal

# Check replication lag (run on primary):
SELECT client_addr,
       state,
       sent_lsn - write_lsn  AS write_lag_bytes,
       sent_lsn - flush_lsn  AS flush_lag_bytes,
       sent_lsn - replay_lsn AS replay_lag_bytes
FROM pg_stat_replication;
# Redis replication — one-liner on replica
redis-cli REPLICAOF primary_host 6379

# Check replication status
redis-cli INFO replication
# master_replid, master_repl_offset, replica_lag seconds

# Kafka replication — ISR (In-Sync Replicas)
# A partition is replicated to ISR nodes.
# Producer with acks=all waits for all ISR to write.
# min.insync.replicas=2  — at least 2 nodes must ack (prevents split-brain)
kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
# Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3

Partitioning (Sharding)

When a dataset is too large for one node, you partition it across multiple nodes. Each partition owns a subset of the data. Also called sharding.

Partitioning Strategies

Range-Based Partitioning

Assign contiguous key ranges to partitions: keys A–M go to Shard 1, N–Z go to Shard 2. Natural for time-series data (partition by date).

Hash-Based Partitioning

Hash the partition key and assign to a bucket: partition = hash(key) % N. Spreads load evenly.

Consistent Hashing

Simple hash(key) % N fails when you add or remove nodes — it remaps almost all keys. Consistent hashing solves this:

# Consistent hashing ring (conceptual)
#
#          0
#          |
#   Node C  |   Node A
#          |
#    270 --+-- 90
#          |
#   Node D  |   Node B
#          |
#         180
#
# Keys map to the first node clockwise from hash(key).
# Adding Node E between A and B remaps only the keys
# that used to map to B in A's arc — not the entire dataset.
#
# Virtual nodes: each physical node gets multiple points on the ring
# (e.g., 150 virtual nodes per server). This evens out load
# when nodes have different capacities.

Used by: Cassandra (virtual nodes), DynamoDB, Redis Cluster (16,384 hash slots), Memcached.

Hotspots & Rebalancing

Even hash partitioning can create hotspots if your key distribution is skewed. Classic example: a celebrity's user_id generates 10,000x more traffic than average users.

Practical Examples

# Kafka partitions — choosing a good partition key
# BAD: partition by region (only 4 regions = 4 max parallelism)
# GOOD: partition by user_id (thousands of users = high parallelism)

# Create topic with 12 partitions, replication factor 3
kafka-topics.sh --create \
  --topic order-events \
  --partitions 12 \
  --replication-factor 3 \
  --bootstrap-server localhost:9092

# DynamoDB partition key guidance:
# - High cardinality (many distinct values)
# - Uniform access pattern (no hot key)
# - Avoid: status (only ~5 values), country (195 values but uneven)
# - Good: user_id, device_id, order_id (UUIDs)
# Vitess (MySQL sharding) — VSchema defines sharding key
# schema.json
{
  "sharded": true,
  "vindexes": {
    "hash": { "type": "hash" }
  },
  "tables": {
    "orders": {
      "columnVindexes": [
        { "column": "customer_id", "name": "hash" }
      ]
    }
  }
}
# All queries that include customer_id are routed to the correct shard.
# Cross-shard queries (no customer_id) fan out to all shards.

Consensus

The consensus problem: how do multiple nodes agree on a single value, even in the presence of failures? Consensus is needed for leader election, distributed locks, and replicated state machines.

Raft Algorithm

Raft (Ongaro & Ousterhout, 2014) was designed to be understandable. It decomposes consensus into three sub-problems:

1. Leader Election

2. Log Replication

3. Safety Guarantee

Raft's key safety property: if two log entries have the same index and term, they contain the same command, and all log entries before them are also identical. This means leaders are always elected with the most up-to-date log — a node won't vote for a candidate whose log is less up-to-date than its own.

# Watching Raft in action with our etcd cluster
# etcdctl uses the Raft protocol internally

# See who the current leader is
etcdctl --endpoints=http://localhost:2379 endpoint status \
  --cluster --write-out=table
# +------------------+------------------+---------+...
# |     ENDPOINT     |        ID        | VERSION |...
# +------------------+------------------+---------+...
# | http://etcd0:2379 | 8e9e05c52164694d |   true  |  <- IS LEADER
# | http://etcd1:2379 | ...              |  false  |
# | http://etcd2:2379 | ...              |  false  |

# Stop the leader — watch re-election (takes ~300ms)
docker compose stop etcd0
sleep 1
etcdctl --endpoints=http://localhost:2379 endpoint status \
  --cluster --write-out=table
# A different node is now leader

Paxos (Brief)

Paxos (Lamport, 1989) was the original consensus algorithm. It has two phases: Prepare/Promise and Accept/Accepted. Multi-Paxos adds a leader optimization for repeated consensus. Paxos is famously difficult to understand and implement correctly. Raft was explicitly designed as a more understandable alternative; most new systems use Raft.

When You Need Consensus

When you do NOT need consensus: if your operation is idempotent and you can tolerate brief inconsistency, use optimistic concurrency (version numbers + CAS) instead of a full consensus round.

Practical: etcd, ZooKeeper, Consul

# etcd: distributed config / service discovery (Kubernetes uses it)
etcdctl put /services/api/primary "10.0.0.1:8080"
etcdctl get /services/api/primary

# Distributed lock with etcd (lease-based)
# Lease expires if client crashes — no deadlocks
etcdctl lease grant 30      # 30-second lease, returns lease_id
etcdctl put /locks/job-processor "worker-42" --lease=lease_id
etcdctl lease keepalive lease_id   # renew while working

# ZooKeeper: leader election recipe
# Create ephemeral sequential znode: /election/leader-0000000001
# Node with lowest sequence number is leader.
# Others watch the next-lower znode (not the leader) to avoid herd effect.

# Consul: service mesh + health checking + KV store
consul kv put feature/dark-mode enabled
consul kv get feature/dark-mode
consul lock /locks/deploy "deploy.sh"  # acquires lock, runs command

Distributed Transactions

Two-Phase Commit (2PC)

2PC coordinates a transaction across multiple participants (databases, services) so that either all commit or all abort.

# 2PC Protocol
#
#  Phase 1 — PREPARE
#  Coordinator → all participants: "Prepare to commit"
#  Each participant: write to WAL, acquire locks, reply VOTE-YES or VOTE-NO
#
#  Phase 2 — COMMIT (if all voted YES) / ABORT (if any voted NO)
#  Coordinator → all participants: "Commit" or "Abort"
#  Each participant: apply or undo, release locks, reply ACK
#
#  Problem: Coordinator failure during Phase 2
#  ─────────────────────────────────────────────
#  Participants voted YES and are now BLOCKED — they hold locks
#  but don't know whether to commit or abort.
#  Recovery requires reading the coordinator's durable log.
#  System is unavailable until coordinator recovers.
2PC is blocking
If the coordinator crashes after sending PREPARE but before sending COMMIT, participants hold locks indefinitely. This is why 2PC is rarely used across microservices — use the Saga pattern instead. 2PC is still common within a single database cluster (Postgres distributed transactions, XA transactions).

Saga Pattern

Instead of a distributed lock, a Saga decomposes a long-running transaction into a sequence of local transactions, each with a compensating transaction that undoes it if a later step fails.

# Saga: E-commerce checkout
#
#  Step 1: Reserve inventory       Compensate: Release reservation
#  Step 2: Charge payment          Compensate: Refund payment
#  Step 3: Schedule fulfillment    Compensate: Cancel fulfillment
#  Step 4: Send confirmation email (no compensation needed — idempotent)
#
#  If Step 3 fails → execute Step 3's compensate → Step 2's compensate
#                  → Step 1's compensate
#
#  Key insight: no distributed locks. Each step commits immediately.
#  Failure is handled by running compensations in reverse.

Choreography vs Orchestration

ChoreographyOrchestration
HowServices react to events (no central coordinator)Central orchestrator tells each service what to do
CouplingLoose — services only know about eventsTighter — orchestrator knows all services
VisibilityHard to trace the full saga flowEasy — all state in orchestrator
ToolsKafka, RabbitMQ eventsTemporal, AWS Step Functions, Cadence
When to useSimple flows, loose coupling desiredComplex flows, need full observability
# Temporal workflow orchestration (Python SDK)
# Temporal handles retries, timeouts, and state durability automatically

from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker

@activity.defn
async def reserve_inventory(order_id: str, sku: str) -> bool:
    # Call inventory service — if this crashes, Temporal retries it
    return await inventory_service.reserve(order_id, sku)

@activity.defn
async def charge_payment(order_id: str, amount: float) -> str:
    return await payment_service.charge(order_id, amount)

@activity.defn
async def release_inventory(order_id: str, sku: str) -> None:
    await inventory_service.release(order_id, sku)

@workflow.defn
class CheckoutWorkflow:
    @workflow.run
    async def run(self, order_id: str, sku: str, amount: float) -> str:
        reserved = await workflow.execute_activity(
            reserve_inventory, order_id, sku,
            start_to_close_timeout=timedelta(seconds=10)
        )
        if not reserved:
            return "out_of_stock"

        try:
            charge_id = await workflow.execute_activity(
                charge_payment, order_id, amount,
                start_to_close_timeout=timedelta(seconds=30)
            )
            return charge_id
        except Exception:
            # Compensate: release inventory
            await workflow.execute_activity(release_inventory, order_id, sku)
            raise

Clocks & Ordering

Why Wall Clocks Are Unreliable

Every node has its own hardware clock. Clocks drift — they can be off by tens to hundreds of milliseconds even with NTP synchronization. You cannot use wall-clock timestamps to determine the order of events across nodes. A write at 12:00:00.001 on Node A may actually have happened after a write at 12:00:00.005 on Node B if Node A's clock is ahead.

Lamport Timestamps

Leslie Lamport (1978) introduced logical clocks. Rule: if event A happened before event B, then timestamp(A) < timestamp(B). The converse is not guaranteed.

# Lamport clock implementation
class LamportClock:
    def __init__(self):
        self.time = 0

    def tick(self):
        """Increment before each local event."""
        self.time += 1
        return self.time

    def update(self, received_time: int):
        """On receiving a message, advance clock to max + 1."""
        self.time = max(self.time, received_time) + 1
        return self.time

# Usage: every message carries the sender's clock value.
# Receiver calls update(msg.timestamp) before processing.
# Guarantees: if A -> B (A causally precedes B), then L(A) < L(B).
# Does NOT guarantee: L(A) < L(B) implies A -> B  (concurrent events
# may have any order).

Vector Clocks

Vector clocks track causality precisely. Each node maintains a vector of counters — one per node. Two events are either causally ordered or concurrent (incomparable).

# Vector clock — detect causality and concurrency
class VectorClock:
    def __init__(self, node_id: str, nodes: list[str]):
        self.node_id = node_id
        self.clock = {n: 0 for n in nodes}

    def tick(self):
        self.clock[self.node_id] += 1
        return dict(self.clock)

    def update(self, received: dict):
        for node, t in received.items():
            self.clock[node] = max(self.clock.get(node, 0), t)
        self.clock[self.node_id] += 1

    def happens_before(self, a: dict, b: dict) -> bool:
        """True if a causally precedes b."""
        return all(a[n] <= b[n] for n in a) and any(a[n] < b[n] for n in a)

    def concurrent(self, a: dict, b: dict) -> bool:
        """True if a and b are concurrent (neither precedes the other)."""
        return not self.happens_before(a, b) and not self.happens_before(b, a)
# DynamoDB uses vector clocks to detect conflicting writes.
# When concurrent writes are detected, the application must resolve them.

Google Spanner TrueTime

Spanner uses GPS receivers and atomic clocks in every data center. The TrueTime API returns a time interval [earliest, latest] bounding the true current time. Before committing, Spanner waits out the uncertainty interval — guaranteeing that the commit timestamp is strictly in the past for all subsequent readers. This delivers linearizability globally without logical clocks.

Event Ordering in Kafka

Kafka guarantees ordering only within a single partition. Events on partition 0 are ordered; events across partitions 0 and 1 have no ordering guarantee. This is why choosing a good partition key matters for event-driven systems that require causal ordering (e.g., all events for a given user_id go to the same partition).

Failure Modes & Fault Tolerance

Types of Failures

Failure Detection

# Heartbeat: node A sends periodic "I'm alive" messages to node B.
# If B receives no heartbeat within timeout, it marks A as failed.
#
# Timeout tuning dilemma:
# - Too short: false positives (slow node marked as failed)
# - Too long: slow recovery time after real failures
# - Rule of thumb: timeout = 99th-percentile RTT * 3 + safety margin
#
# Phi Accrual Failure Detector (Cassandra, Akka):
# Instead of binary up/down, computes a "suspicion level" phi(t).
# phi increases as the time since last heartbeat grows.
# Application sets threshold: phi > 8 → treat as failed.
# Adapts to network conditions dynamically.

Circuit Breaker Pattern

Prevent cascading failures when a downstream service is struggling. The circuit has three states:

import time
from enum import Enum

class State(Enum):
    CLOSED = "closed"       # Normal — requests flow through
    OPEN = "open"           # Failing — requests blocked immediately
    HALF_OPEN = "half_open" # Testing — one probe request allowed

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.state = State.CLOSED
        self.opened_at = None

    def call(self, func, *args, **kwargs):
        if self.state == State.OPEN:
            if time.time() - self.opened_at > self.timeout:
                self.state = State.HALF_OPEN
            else:
                raise Exception("Circuit OPEN — request blocked")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failures = 0
        self.state = State.CLOSED

    def _on_failure(self):
        self.failures += 1
        if self.failures >= self.failure_threshold:
            self.state = State.OPEN
            self.opened_at = time.time()

Retry with Exponential Backoff + Jitter

import random
import time

def retry_with_backoff(func, max_retries=5, base_delay=0.1, max_delay=30.0):
    """
    Retry with full jitter: delay = random(0, min(max_delay, base * 2^attempt))
    Full jitter prevents thundering herd — all retrying clients don't
    hammer the recovering server at the same moment.
    """
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            cap = min(max_delay, base_delay * (2 ** attempt))
            delay = random.uniform(0, cap)   # full jitter
            time.sleep(delay)

Practical: Kubernetes Probes

# Kubernetes liveness + readiness probes
# livenessProbe: is the container alive? If not, restart it.
# readinessProbe: is the container ready for traffic? If not, remove from LB.
# startupProbe: is the container done initializing? (prevents premature liveness kills)

spec:
  containers:
  - name: api
    image: myapp:latest
    livenessProbe:
      httpGet:
        path: /healthz
        port: 8080
      initialDelaySeconds: 10
      periodSeconds: 10
      failureThreshold: 3        # restart after 3 consecutive failures
    readinessProbe:
      httpGet:
        path: /ready
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 5
      failureThreshold: 2        # remove from LB after 2 consecutive failures
    startupProbe:
      httpGet:
        path: /healthz
        port: 8080
      failureThreshold: 30       # up to 30 * 10s = 5 min for slow startup
      periodSeconds: 10

Message Queues & Event Streaming

Delivery Semantics

SemanticGuaranteeImplicationUse when
At-most-onceMessage delivered 0 or 1 timesPossible data lossMetrics, telemetry — occasional loss OK
At-least-onceMessage delivered 1 or more timesPossible duplicates — consumer must be idempotentMost event-driven systems
Exactly-onceDelivered exactly 1 timeNo loss, no duplicates — expensiveFinancial transactions, billing
Design for at-least-once, implement idempotency
Exactly-once is hard and expensive. The practical pattern: use at-least-once delivery, make your consumer idempotent (processing the same message twice has the same effect as processing it once), and deduplicate using a message ID. This is simpler and more reliable than true exactly-once.

Kafka Deep Dive

# Kafka: distributed, partitioned, replicated commit log
#
# Producer → [Partition 0] [Partition 1] [Partition 2] → Consumer Group
#                offset 0                                 |
#                offset 1  ← consumer tracks its offset   |
#                offset 2                                  |
#                                                   ┌──────┴──────┐
#                                              Consumer A    Consumer B
#                                             (partition 0)  (partition 1)
#
# Consumer group: each partition is consumed by exactly one consumer in the group.
# Multiple consumer groups can independently consume the same topic (event replay).

# Producer with idempotent exactly-once (Kafka transactions)
kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --producer-property enable.idempotence=true \
  --producer-property acks=all

# Consumer group management
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group payment-processor
# Shows LAG (messages behind) per partition — key operational metric
# Kafka producer with transactions (exactly-once semantics)
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    enable_idempotence=True,      # idempotent delivery (deduplication)
    acks='all',                   # wait for all ISR to ack
    retries=5,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    transactional_id='payment-processor-1'  # enables transactions
)

producer.init_transactions()
try:
    producer.begin_transaction()
    producer.send('payments', {'order_id': '123', 'amount': 99.99})
    producer.send('order-events', {'order_id': '123', 'status': 'paid'})
    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise

SQS & RabbitMQ

# SQS: visibility timeout pattern
# When a consumer reads a message, it becomes invisible for `visibility_timeout`.
# If consumer crashes before deleting, message reappears — another consumer retries.
# Dead Letter Queue (DLQ): after N failed retries, message moved to DLQ for inspection.

import boto3
sqs = boto3.client('sqs', region_name='us-east-1')

queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders'

# Receive (message invisible for 30s)
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    VisibilityTimeout=30,
    WaitTimeSeconds=20   # long polling — reduces empty responses
)

for msg in response.get('Messages', []):
    try:
        process(msg['Body'])
        # Delete only after successful processing
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=msg['ReceiptHandle']
        )
    except Exception:
        # Don't delete — message will reappear after visibility timeout
        pass

Distributed Storage

Storage System Taxonomy

SystemTypeConsistencyBest for
Amazon S3 / GCSObject storeStrong (since 2020)Data lake, backups, static assets
HDFSDistributed FSSequentialLarge batch files (Spark, Hive)
CassandraWide-column DBTunable (AP default)Time-series, high write throughput
CockroachDBDistributed SQLSerializableOLTP with geo-distribution
Google SpannerDistributed SQLExternal consistencyGlobal OLTP (finance, inventory)
Delta Lake / IcebergTable format (on S3)ACID via OCCData lakehouse (batch + stream)

LSM Trees vs B-Trees

Most distributed databases use one of these two storage engines under the hood:

# B-Tree: in-place updates, great for reads
# Pages (typically 4-16 KB) form a tree.
# Read: O(log n) page reads from root to leaf.
# Write: finds the leaf page, updates in-place. May require page split.
# Write amplification: ~10x (one logical write → ~10 disk writes with WAL + CoW)
# Good for: read-heavy OLTP, PostgreSQL, MySQL InnoDB

# LSM Tree (Log-Structured Merge): sequential writes, background compaction
# Writes go to in-memory memtable first (fast).
# Memtable flushed to immutable SSTable files (sorted, sequential I/O).
# Reads must check memtable + potentially many SSTables (bloom filters help).
# Background compaction merges SSTables — removes deleted keys, reduces file count.
# Write amplification: ~30x (multiple compaction rounds) — worse than B-tree
# Read amplification: worse without compaction, better with full compaction
# Good for: write-heavy workloads, Cassandra, RocksDB, LevelDB, HBase

# Practical implication for Cassandra:
# - Excellent for: IoT sensor writes, event logs, activity streams
# - Painful for: many small updates to same key (compaction pressure)
# - Use TTL to expire old data — keeps SSTables manageable

Choosing Storage for Data Pipelines

Caching in Distributed Systems

Cache Patterns

# Cache-Aside (Lazy Loading) — most common pattern
# Application manages the cache explicitly.
# Pro: only requested data is cached, resilient to cache failure.
# Con: cache miss penalty (3 trips: cache miss → DB read → cache write).

def get_user(user_id: str) -> dict:
    cached = redis.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    redis.setex(f"user:{user_id}", 300, json.dumps(user))  # TTL 5 min
    return user

# Write-Through — write to cache and DB synchronously
# Pro: cache always consistent with DB.
# Con: write latency doubles; caches data that may never be read.

def update_user(user_id: str, data: dict):
    db.execute("UPDATE users SET ... WHERE id = %s", user_id)
    redis.setex(f"user:{user_id}", 300, json.dumps(data))

# Write-Behind (Write-Back) — write to cache, async flush to DB
# Pro: very low write latency.
# Con: data loss if cache crashes before flush; complex implementation.
# Used by: database buffer pools (dirty pages), some write-heavy caches.

Cache Invalidation

Phil Karlton famously said there are only two hard problems in computer science: cache invalidation and naming things. The challenges:

# Cache stampede prevention: probabilistic early expiration
# (Optimal algorithm by Vattani et al. — no locking needed)
import math, random, time

def fetch_with_early_expiry(key: str, ttl: int, beta: float = 1.0):
    """
    Recompute the cache entry slightly before it expires.
    Higher beta = more aggressive early recomputation.
    Distributes the recompute load over time instead of spiking at expiry.
    """
    cached = redis.get(key)
    if cached:
        data, expiry, delta = json.loads(cached)
        # Decide whether to recompute early
        if time.time() - beta * delta * math.log(random.random()) < expiry:
            return data
    # Cache miss or early recompute decision
    start = time.time()
    data = compute_expensive_value(key)
    delta = time.time() - start
    expiry = time.time() + ttl
    redis.setex(key, ttl, json.dumps([data, expiry, delta]))
    return data

# Locking approach: only one request recomputes, others wait
import threading
_locks = {}
def fetch_with_lock(key: str, ttl: int):
    cached = redis.get(key)
    if cached:
        return json.loads(cached)
    lock = _locks.setdefault(key, threading.Lock())
    with lock:
        cached = redis.get(key)  # double-check after acquiring lock
        if cached:
            return json.loads(cached)
        data = compute_expensive_value(key)
        redis.setex(key, ttl, json.dumps(data))
        return data

Redis Cluster

# Redis Cluster: 16,384 hash slots distributed across nodes
# Keys map to slots: slot = CRC16(key) % 16384
# Each master node owns a range of slots (e.g., 0-5460, 5461-10922, 10923-16383)
# Each master has 1+ replicas.

# Create a 3-master, 3-replica cluster
redis-cli --cluster create \
  127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
  127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
  --cluster-replicas 1

# Check cluster info
redis-cli -p 7000 cluster info
redis-cli -p 7000 cluster nodes

# Hash tags: {user_id}.profile and {user_id}.preferences
# map to the same slot (same node) — needed for multi-key ops
redis-cli -c -p 7000 set "{user_123}.profile" '{"name":"Alice"}'
redis-cli -c -p 7000 set "{user_123}.preferences" '{"theme":"dark"}'

Service Discovery & Load Balancing

Discovery Patterns

PatternHow it worksExamples
DNS-basedService name resolves to VIP or pod IPs. K8s Services use this.Kubernetes ClusterIP, AWS ALB, Route 53
Client-side registryClient queries registry for instance list, picks one, calls directly.Eureka (Netflix OSS), Consul with client SDK
Server-side proxyClient calls proxy/LB; proxy queries registry and routes.Envoy, Nginx, AWS ALB, Istio

How Kubernetes Service Discovery Works

# K8s Service: stable DNS name + VIP for a set of pods
apiVersion: v1
kind: Service
metadata:
  name: payment-service
  namespace: production
spec:
  selector:
    app: payment          # routes to pods with this label
  ports:
  - port: 80
    targetPort: 8080
  type: ClusterIP         # internal-only VIP

# DNS resolution inside the cluster:
# payment-service.production.svc.cluster.local → 10.96.43.21 (VIP)
# kube-proxy maintains iptables rules: VIP → random pod IP (DNAT)

# Headless Service (for stateful sets, direct pod addressing):
spec:
  clusterIP: None         # no VIP — DNS returns all pod IPs
  # Useful for: Cassandra, Kafka, Redis Cluster (clients need all node IPs)

Load Balancing Algorithms

gRPC Load Balancing Challenge

HTTP/1.1 load balancers route per-request. gRPC uses HTTP/2 which multiplexes many requests over a single long-lived TCP connection. A naive L4 load balancer will route all gRPC traffic for one client to one backend, bypassing load distribution entirely.

Solutions: (1) L7 load balancer that understands HTTP/2 (Envoy, Nginx with grpc_pass), (2) client-side load balancing with service discovery (grpc-go supports this natively), (3) headless Kubernetes service + client-side DNS round-robin.

Observability in Distributed Systems

The Three Pillars

Why traces matter in microservices
When Service A calls B which calls C and D, a 500 ms latency spike could be in any of them. Logs tell you what happened in each service. Only a distributed trace tells you A spent 20 ms, B spent 450 ms (waiting on D), C spent 30 ms. Without traces, debugging cross-service issues is guesswork.

Distributed Tracing with OpenTelemetry

# OpenTelemetry Python — automatic + manual instrumentation
# pip install opentelemetry-sdk opentelemetry-exporter-otlp
# pip install opentelemetry-instrumentation-fastapi
# pip install opentelemetry-instrumentation-httpx

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

# Setup — typically in app startup
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)

# Auto-instrument FastAPI (injects trace headers on all requests)
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()  # auto-propagates headers on outbound calls

# Manual span for business operations
tracer = trace.get_tracer(__name__)

async def process_order(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        span.set_attribute("order.source", "web")

        with tracer.start_as_current_span("validate_inventory"):
            await inventory_service.check(order_id)   # child span

        with tracer.start_as_current_span("charge_payment") as payment_span:
            try:
                charge_id = await payment_service.charge(order_id)
                payment_span.set_attribute("payment.charge_id", charge_id)
            except Exception as e:
                payment_span.record_exception(e)
                payment_span.set_status(trace.Status(trace.StatusCode.ERROR))
                raise

Correlation IDs & Context Propagation

# W3C Trace Context — standard HTTP headers for trace propagation
# traceparent: 00-{trace_id}-{span_id}-{flags}
# tracestate: vendor-specific state

# Manual propagation if not using auto-instrumentation:
from opentelemetry.propagate import inject, extract
from opentelemetry import context

# Outbound call — inject trace context into headers
def call_downstream(url: str):
    headers = {}
    inject(headers)   # adds traceparent header
    return httpx.get(url, headers=headers)

# Inbound call — extract and restore context
def handle_request(request_headers: dict):
    ctx = extract(request_headers)  # reads traceparent from headers
    with trace.get_tracer(__name__).start_as_current_span(
        "handle_request", context=ctx
    ):
        pass  # this span is now a child of the upstream span

# Correlation ID for logs (simpler than full tracing):
import uuid
CORRELATION_ID_KEY = "correlation_id"

def middleware(request, call_next):
    correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
    # Store in context var, include in all log records
    with log_context(correlation_id=correlation_id):
        response = call_next(request)
    response.headers["X-Correlation-ID"] = correlation_id
    return response

Modern Data Engineering Patterns

Lambda vs Kappa Architecture

LambdaKappa
Batch layerYes — reprocesses all historical data periodicallyNo — stream only
Speed layerYes — real-time view of recent dataYes — single stream pipeline
Serving layerMerges batch + speed viewsSingle materialized view
ComplexityHigh — two code paths (batch & stream) to maintainLower — one code path
ReprocessingNatural (batch layer rewrites)Replay Kafka topic from start
TrendDecliningPreferred for new systems

Change Data Capture (CDC) with Debezium

CDC captures every row-level change in a database (insert/update/delete) and streams it as an event. The source database's transaction log is the authoritative event source — no application changes needed.

# CDC pipeline: Postgres → Debezium → Kafka → Data Warehouse
#
#   [Postgres WAL] → [Debezium Connector] → [Kafka topic: postgres.public.orders]
#                                                     |
#                                              [Kafka Connect Sink]
#                                                     |
#                                              [Snowflake / BigQuery]
#
# Debezium reads Postgres WAL (write-ahead log) via logical replication.
# Each change becomes a Kafka message with before/after state.

# 1. Enable logical replication in Postgres
# postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

# 2. Debezium connector config (Kafka Connect REST API)
curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -d '{
  "name": "postgres-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "production",
    "database.server.name": "prod",
    "table.include.list": "public.orders,public.payments",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot"
  }
}'

# 3. Consume CDC events
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic prod.public.orders \
  --from-beginning
# Each message: {"before": {...}, "after": {...}, "op": "u", "ts_ms": 1708723200000}
# op: c=create, u=update, d=delete, r=read (snapshot)

Event Sourcing + CQRS

Event Sourcing: Instead of storing current state, store the sequence of events that produced it. State is derived by replaying events.

CQRS (Command Query Responsibility Segregation): Separate the write model (commands that change state) from the read model (queries that read materialized views). Naturally pairs with event sourcing: commands produce events; read models are projections built from events.

# CQRS + Event Sourcing pattern
from dataclasses import dataclass
from datetime import datetime
from typing import List

@dataclass
class OrderCreated:
    order_id: str
    user_id: str
    items: list
    created_at: datetime

@dataclass
class PaymentProcessed:
    order_id: str
    charge_id: str
    amount: float
    processed_at: datetime

@dataclass
class OrderShipped:
    order_id: str
    tracking_number: str
    shipped_at: datetime

class Order:
    """Write model — derived from events."""
    def __init__(self):
        self.status = None
        self.items = []
        self.charge_id = None

    @classmethod
    def from_events(cls, events: List) -> 'Order':
        order = cls()
        for event in events:
            order.apply(event)
        return order

    def apply(self, event):
        if isinstance(event, OrderCreated):
            self.status = "pending"
            self.items = event.items
        elif isinstance(event, PaymentProcessed):
            self.status = "paid"
            self.charge_id = event.charge_id
        elif isinstance(event, OrderShipped):
            self.status = "shipped"

# Read model projection (separate, denormalized, optimized for queries)
class OrderSummaryProjection:
    """Rebuilt by replaying all events from the event store."""
    def handle(self, event):
        if isinstance(event, OrderCreated):
            db.upsert("order_summaries", {
                "order_id": event.order_id,
                "user_id": event.user_id,
                "status": "pending",
                "item_count": len(event.items)
            })
        elif isinstance(event, PaymentProcessed):
            db.update("order_summaries",
                      {"status": "paid", "amount": event.amount},
                      where={"order_id": event.order_id})

Data Mesh

Data mesh (Zhamak Dehghani, 2019) is an organizational and architectural approach treating data as a product. Four principles:

  1. Domain-oriented ownership: The team that generates the data owns and serves it. No central data engineering team bottleneck.
  2. Data as a product: Each domain exposes discoverable, addressable, trustworthy, and self-describing data products.
  3. Self-serve data platform: Infrastructure abstractions so domain teams can build/deploy data products without platform expertise.
  4. Federated computational governance: Global policies (security, privacy) enforced across all products, implemented locally.

System Design Patterns Summary

Quick-reference for design interviews and architecture decisions:

PatternProblem it solvesKey trade-offWhen to reach for it
ShardingData too large for one nodeCross-shard queries become expensiveTable > ~1TB, write throughput saturated
ReplicationFault tolerance + read scalingConsistency vs availabilityAlways — every production DB should have replicas
Consensus (Raft)Nodes agree on a single valueLatency — requires quorum round-tripLeader election, config store, distributed lock
Message QueueDecouple producers from consumers, buffer load spikesAt-least-once delivery requires idempotencyAsync workflows, fan-out, load leveling
Cache-AsideReduce DB load, lower latencyCache invalidation, cold startHot read data with acceptable staleness
SagaDistributed transaction across servicesNo isolation between steps — partial failures visibleMulti-service workflows needing rollback
CQRSRead/write models differ significantlyComplexity, eventual consistency between modelsHigh-traffic reads with complex denormalization needs
Event SourcingNeed full audit trail, time travelSchema evolution complexity, query difficultyFinancial ledgers, audit-critical domains
Circuit BreakerPrevent cascading failuresFalse positives under transient loadAny synchronous service-to-service call
BulkheadIsolate failure domainsResource overhead (separate thread pools)Critical services must not be starved by non-critical
SidecarAdd cross-cutting concerns (tracing, auth) without changing app codeExtra network hop, operational complexityService mesh (Envoy), secret injection
CDC (Debezium)Sync DB state to other systems without pollingRequires WAL access, schema couplingReal-time data pipelines, cache invalidation at scale

Interview Quick-Reference

How to answer "design a distributed key-value store"
  1. Requirements: Read/write ratio? Consistency requirement (linearizable or eventual)? Scale (data size, QPS)?
  2. Partitioning: Consistent hashing, virtual nodes. Partition key = raw key hash.
  3. Replication: Leaderless (Dynamo-style) for availability, OR single-leader per partition for stronger consistency. Replication factor = 3.
  4. Consistency: Quorum reads/writes. W + R > N. Tunable per request.
  5. Conflict resolution: LWW with server-side timestamps (simple), or vector clocks (correct).
  6. Failure detection: Gossip protocol for membership, phi-accrual for failure detection.
  7. Storage: LSM tree (RocksDB) for high write throughput.
How to answer "design a URL shortener at scale"
  1. Write path: Generate short code (base62 of a counter from a distributed ID generator — Snowflake), store in DB partitioned by short code.
  2. Read path: Dominant traffic. Cache-aside with Redis (TTL 24h). Cache hit rate > 99%.
  3. DB: Cassandra (hash-partitioned by short code, high read throughput) or Postgres with read replicas.
  4. Replication: Multi-region read replicas for geographic latency.
  5. Rate limiting: Token bucket per user_id stored in Redis.
  6. Analytics: Click events → Kafka → Spark/Flink → warehouse. Not in the hot path.
Consistency model cheat sheet for interviews
If the interviewer asks about...Answer
"User posts a comment and immediately reloads"Read-your-writes guarantee (sticky to primary or use causal consistency)
"Two users update the same record simultaneously"Optimistic concurrency (version numbers), or serializable transactions
"Counter increments (likes, views)"Eventual consistency + idempotent deduplication (exact count not needed)
"Bank account balance"Serializable (or strict serializable). No shortcuts.
"Shopping cart"AP (eventual). Users can add items offline; merge on sync. Amazon Dynamo paper.
"Leader election"Consensus (Raft/etcd). Must be CP.
The senior engineer mindset
Every distributed systems decision is a trade-off. Strong consistency costs latency. High availability risks stale reads. Partitioning enables scale but complicates queries. When answering design questions, the most important skill is not knowing the "right" answer — it is knowing what questions to ask, and being explicit about the trade-offs you are accepting.