Table of Contents

0. Setup & Environment

Get a single-node Kafka cluster running locally so you can produce and consume from the terminal before diving into the internals.

Prerequisites

Docker Desktop is the only hard requirement.

# Install Docker Desktop
brew install --cask docker

# Verify
docker --version

Single-Node Kafka with Docker Compose

Save this as docker-compose.yml in a working directory. This uses KRaft mode — no ZooKeeper needed.

services:
  kafka:
    image: apache/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker compose up -d
KRaft mode (Kafka Raft, GA since 3.3) stores cluster metadata in Kafka itself. The old ZooKeeper ensemble is gone — one fewer system to run and operate.

CLI Tools

The Kafka scripts ship inside the container. Use docker exec to run them without installing anything locally.

# Create a topic (3 partitions, replication factor 1 for single-node)
docker exec -it kafka-kafka-1 \
  /opt/kafka/bin/kafka-topics.sh --create \
  --topic test --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092

# List topics
docker exec -it kafka-kafka-1 \
  /opt/kafka/bin/kafka-topics.sh --list \
  --bootstrap-server localhost:9092
kcat is a lightweight alternative to the bundled scripts and works great for quick ad-hoc produce/consume: brew install kcat. It connects to any broker without a JVM on your host.

Produce and Consume (smoke test)

# Terminal 1 — produce a message
echo "hello kafka" | docker exec -i kafka-kafka-1 \
  /opt/kafka/bin/kafka-console-producer.sh \
  --topic test --bootstrap-server localhost:9092

# Terminal 2 — consume from the beginning
docker exec -it kafka-kafka-1 \
  /opt/kafka/bin/kafka-console-consumer.sh \
  --topic test --from-beginning --bootstrap-server localhost:9092

You should see hello kafka appear in Terminal 2. The consumer keeps running; send more messages in Terminal 1 and they stream in immediately.

Cleanup

# Stop and remove containers (data volumes persist)
docker compose down

# Stop, remove containers AND wipe all data volumes
docker compose down -v

1. Architecture

Kafka is a distributed, partitioned, replicated commit log. Every message is durably written to disk and replicated across brokers. Consumers read at their own pace — Kafka does not push; consumers pull.

Producers Kafka Cluster Consumers ┌──────────────────────────┐ Producer A ──┐ │ Broker 1 (Controller) │ ┌── Consumer Group A │ │ ┌────────────────────┐ │ │ ├── Consumer 1 (P0, P1) Producer B ──┼──▶│ │ Topic: orders │ │──▶│ └── Consumer 2 (P2) │ │ │ Partition 0 (L) │ │ │ Producer C ──┘ │ │ Partition 1 (L) │ │ └── Consumer Group B │ │ Partition 2 (F) │ │ └── Consumer 3 (P0,P1,P2) │ └────────────────────┘ │ │ │ │ Broker 2 │ │ ┌────────────────────┐ │ │ │ Partition 0 (F) │ │ │ │ Partition 2 (L) │ │ │ └────────────────────┘ │ │ │ │ Broker 3 │ │ ┌────────────────────┐ │ │ │ Partition 1 (F) │ │ │ │ Partition 2 (F) │ │ │ └────────────────────┘ │ └──────────────────────────┘ L=Leader F=Follower

Brokers

A broker is a single Kafka server process. A cluster typically runs 3+ brokers for fault tolerance. Each broker:

Topics and Partitions

A topic is a named category/feed. Topics are split into partitions — ordered, immutable, append-only logs. Each message in a partition gets a monotonically increasing offset.

Partition = Unit of Parallelism
Kafka's parallelism comes entirely from partitions. More partitions = more parallel producers and consumers. But more partitions also means more overhead (file handles, replication traffic, leader elections).

Replication: Leaders, Followers, and ISR

Each partition has one leader and zero or more followers. All reads and writes go to the leader. Followers replicate from the leader. The In-Sync Replica (ISR) set contains replicas that are caught up with the leader within replica.lag.time.max.ms.

ConceptDescription
replication.factorHow many copies of each partition exist (typically 3)
min.insync.replicasMinimum ISR count required for a write to succeed (typically 2)
ISRSet of replicas fully caught up with the leader
OSROut-of-sync replicas — lagging behind; removed from ISR
Leader epochMonotonic counter incremented on each leader election; prevents stale leader writes
acks=all + min.insync.replicas
Setting acks=all on the producer only guarantees a write is acknowledged by the ISR. If ISR shrinks below min.insync.replicas, the broker returns NotEnoughReplicas. With RF=3 and min.ISR=2, you can tolerate 1 broker failure.

ZooKeeper vs KRaft Mode

Historically Kafka used ZooKeeper to store cluster metadata, elect controllers, and track consumer offsets (pre-0.9). Since Kafka 3.3 (KIP-833), KRaft mode (Kafka Raft Metadata) is production-ready and ZooKeeper is deprecated.

AspectZooKeeper ModeKRaft Mode
Metadata storageZooKeeper ensembleInternal __cluster_metadata topic
ControllerSingle active controller elected via ZKRaft-based quorum of controllers
Scalability~200k partitions practical limitMillions of partitions
Operational complexityTwo systems to manageSingle system
Kafka versionAll versions2.8+ (preview), 3.3+ (production)

Log Segments

Each partition is stored as a sequence of log segments on disk. A segment consists of:

New messages are appended to the active segment. When the segment reaches log.segment.bytes (default 1GB) or log.roll.ms, it is rolled and a new active segment starts. Old segments are eligible for deletion or compaction based on retention policy.

Consumer Groups

Topic "payments" — 4 partitions P0 P1 P2 P3 │ │ │ │ ▼ ▼ ▼ ▼ ┌─────────────────────┐ │ Consumer Group A │ ← 4 consumers: 1 partition each (ideal) │ C1(P0) C2(P1) │ │ C3(P2) C4(P3) │ └─────────────────────┘ ┌─────────────────────┐ │ Consumer Group B │ ← 2 consumers: 2 partitions each │ C1(P0,P1) │ │ C2(P2,P3) │ └─────────────────────┘ ┌─────────────────────┐ │ Consumer Group C │ ← 5 consumers: 1 idle (consumers > partitions) │ C1(P0) C2(P1) │ │ C3(P2) C4(P3) C5() │ └─────────────────────┘

Each consumer group tracks its own offsets independently. Multiple groups can read the same topic simultaneously without interference. A partition is assigned to exactly one consumer within a group at any time.

2. Producers

The producer is responsible for choosing which partition to send a record to, batching records for efficiency, and handling retries and acknowledgement guarantees.

Java Producer API

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

public class OrderProducer {

    private final KafkaProducer<String, String> producer;
    private static final String TOPIC = "orders";

    public OrderProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Reliability: wait for all ISR to acknowledge
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        // Idempotence: exactly-once per partition (requires acks=all)
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Batching: wait up to 10ms to fill a batch
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        // Max batch size in bytes (16KB default, 32KB here)
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024);

        // Compression: lz4 is fast; gzip gives best ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

        // Retries (idempotent producer sets this to Integer.MAX_VALUE by default)
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);

        // In-flight requests: must be 1 for non-idempotent ordered delivery;
        // idempotent producer allows up to 5
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        // Block if internal queue is full (instead of throwing exception)
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60_000);

        this.producer = new KafkaProducer<>(props);
    }

    // Fire-and-forget (async with callback)
    public void sendAsync(String orderId, String payload) {
        ProducerRecord<String, String> record =
            new ProducerRecord<>(TOPIC, orderId, payload); // key=orderId routes to same partition

        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // Log, alert, send to DLQ
                System.err.println("Send failed for order " + orderId + ": " + exception.getMessage());
            } else {
                System.out.printf("Sent to %s-P%d @ offset %d%n",
                    metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
    }

    // Synchronous send — blocks until acknowledged or throws
    public RecordMetadata sendSync(String orderId, String payload) throws Exception {
        ProducerRecord<String, String> record =
            new ProducerRecord<>(TOPIC, orderId, payload);
        Future<RecordMetadata> future = producer.send(record);
        return future.get(); // blocks — use only when you must guarantee ordering inline
    }

    // Send to specific partition (bypass partitioner)
    public void sendToPartition(int partition, String key, String value) {
        ProducerRecord<String, String> record =
            new ProducerRecord<>(TOPIC, partition, key, value);
        producer.send(record, (meta, ex) -> { /* handle */ });
    }

    public void flush() { producer.flush(); }

    public void close() { producer.close(); }
}

Acknowledgement Modes

acks valueBehaviorDurabilityLatency
0No ack — fire and forgetLowest — loss on any failureLowest
1Leader acks after writing to local logLoss if leader fails before replicationMedium
all / -1Leader acks after all ISR replicateHighest — survives broker failures up to RF-1Highest

Partitioning Strategies

// Custom partitioner example
public class RegionPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        if (key == null) return 0;
        String region = key.toString().split(":")[0]; // "us-east:order-123"
        return switch (region) {
            case "us-east" -> 0 % numPartitions;
            case "us-west" -> 1 % numPartitions;
            case "eu"      -> 2 % numPartitions;
            default        -> Math.abs(key.hashCode()) % numPartitions;
        };
    }
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

// Register in producer config:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RegionPartitioner.class.getName());

Idempotent and Transactional Producers

With enable.idempotence=true, the broker assigns each producer a Producer ID (PID) and tracks a per-partition sequence number. Duplicate retries (same PID + sequence) are silently discarded. This gives exactly-once within a single partition and session.

// Transactional producer — exactly-once across multiple partitions/topics
Properties props = new Properties();
// ... base config ...
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-txn-1");
// transactional.id must be unique per producer instance

KafkaProducer<String, String> txnProducer = new KafkaProducer<>(props);
txnProducer.initTransactions(); // blocks until coordinator assigns epoch

try {
    txnProducer.beginTransaction();

    // Send to multiple topics atomically
    txnProducer.send(new ProducerRecord<>("orders", orderId, orderJson));
    txnProducer.send(new ProducerRecord<>("order-audit", orderId, auditJson));

    // Optionally commit consumer offsets as part of the transaction
    // (enables consume-transform-produce exactly-once)
    txnProducer.sendOffsetsToTransaction(offsetsMap, consumerGroupMetadata);

    txnProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
    // Fatal — cannot recover, close producer
    txnProducer.close();
} catch (KafkaException e) {
    txnProducer.abortTransaction();
    // Retry or propagate
}

Python Producer (confluent-kafka)

from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed for {msg.key()}: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

producer = Producer({
    "bootstrap.servers": "broker1:9092,broker2:9092",
    "acks": "all",
    "enable.idempotence": True,
    "linger.ms": 10,
    "batch.size": 32768,
    "compression.type": "lz4",
    "retries": 2147483647,
    "max.in.flight.requests.per.connection": 5,
})

# Produce a message
producer.produce(
    topic="orders",
    key="order-123",          # bytes or str
    value='{"amount": 99.99}', # bytes or str
    on_delivery=delivery_callback,
)

# Must call poll() or flush() to trigger callbacks
producer.poll(0)       # non-blocking trigger
producer.flush()       # block until all messages delivered

3. Consumers

Java Consumer API — Poll Loop

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class OrderConsumer {

    private final KafkaConsumer<String, String> consumer;
    private volatile boolean running = true;

    public OrderConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");

        // What to do when no committed offset exists for this group+partition
        // "earliest" = from beginning, "latest" = from now (default)
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Disable auto-commit for manual control
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // Max records returned per poll() call
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

        // Must call poll() within this window or consumer is considered dead
        // and triggers rebalance. Must be > your processing time per batch.
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000); // 5 minutes

        // Heartbeat thread sends heartbeats independently of poll()
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3_000);
        // If no heartbeat within session.timeout.ms, consumer is removed from group
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45_000);

        // Partition assignment strategy (see below)
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
            "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

        this.consumer = new KafkaConsumer<>(props);
    }

    public void run() {
        consumer.subscribe(Collections.singletonList("orders"),
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // Called before rebalance: commit offsets for revoked partitions
                    commitSync(partitions);
                }
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // Called after rebalance: can seek to specific offsets here
                    System.out.println("Assigned: " + partitions);
                }
            });

        try {
            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        processRecord(record);
                    }

                    // Manual commit per partition after processing
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    // Commit offset+1 (next offset to fetch)
                    consumer.commitSync(Collections.singletonMap(
                        partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close(); // also triggers rebalance + commits pending offsets
        }
    }

    private void commitSync(Collection<TopicPartition> partitions) {
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        for (TopicPartition tp : partitions) {
            offsets.put(tp, new OffsetAndMetadata(consumer.position(tp)));
        }
        consumer.commitSync(offsets);
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s%n",
            record.topic(), record.partition(), record.offset(), record.key());
        // Your processing logic here
    }

    public void shutdown() { running = false; }
}

Partition Assignment Strategies

StrategyDescriptionRebalance Type
RangeAssignorAssigns contiguous partition ranges per topic. Can be uneven with multiple topics.Eager (stop-the-world)
RoundRobinAssignorDistributes partitions round-robin across consumers. Balanced but ignores current assignment.Eager (stop-the-world)
StickyAssignorLike round-robin but tries to keep existing assignments. Minimizes partition movements.Eager (stop-the-world)
CooperativeStickyAssignorIncremental rebalancing: only revokes partitions that need to move. Recommended.Cooperative (incremental)
Use CooperativeStickyAssignor
Eager rebalance protocols revoke ALL partitions from ALL consumers, causing a brief "stop the world" pause. Cooperative rebalancing (Kafka 2.4+) only revokes partitions that actually need to be moved, dramatically reducing rebalance impact for large consumer groups.

Offset Management

Offsets are committed to the internal __consumer_offsets topic. Options:

// Async commit with callback for logging
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.warn("Async commit failed for offsets {}: {}", offsets, exception.getMessage());
        // Note: do NOT retry here — commitAsync will retry via the next commitAsync/Sync
    }
});

// Seeking to a specific offset (e.g., replay from beginning)
consumer.seek(new TopicPartition("orders", 0), 0L);

// Seek to beginning of all assigned partitions
consumer.seekToBeginning(consumer.assignment());

// Seek to end (skip old messages)
consumer.seekToEnd(consumer.assignment());

Python Consumer (confluent-kafka)

from confluent_kafka import Consumer, KafkaError, KafkaException
import signal

consumer = Consumer({
    "bootstrap.servers": "broker1:9092",
    "group.id": "order-processing-group",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "max.poll.interval.ms": 300000,
    "session.timeout.ms": 45000,
    "heartbeat.interval.ms": 3000,
})

running = True
def shutdown(signum, frame):
    global running
    running = False
signal.signal(signal.SIGINT, shutdown)

consumer.subscribe(["orders"])

try:
    while running:
        msg = consumer.poll(timeout=1.0)  # returns one message or None
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError.PARTITION_EOF:
                print(f"Reached end of partition {msg.partition()}")
            else:
                raise KafkaException(msg.error())
            continue

        # Process message
        print(f"Received: key={msg.key()} value={msg.value()} "
              f"partition={msg.partition()} offset={msg.offset()}")

        # Manual commit — store=True stages, commit() flushes
        consumer.store_offsets(msg)  # stage this message's offset

    consumer.commit()  # final commit on shutdown
finally:
    consumer.close()

4. Message Delivery Semantics

SemanticProducer ConfigConsumer ConfigRisk
At-most-once acks=0 or acks=1, no retries Auto-commit before processing Message loss on failure
At-least-once acks=all, retries enabled Commit after processing Duplicate processing on retry/crash
Exactly-once enable.idempotence=true + transactional.id isolation.level=read_committed, offsets in transaction Higher latency, more complex

Exactly-Once Semantics (EOS)

True EOS requires coordination at both the producer and consumer sides:

EOS Components
  • Idempotent producer: Deduplicates retries within a session and partition using PID + sequence number
  • Transactional producer: Writes to multiple partitions/topics atomically — all visible or none
  • Transactional consumer: isolation.level=read_committed — only reads messages from committed transactions
  • Atomic offset commits: sendOffsetsToTransaction() commits consumer offsets as part of the produce transaction

Isolation Levels

Isolation LevelBehaviorUse Case
read_uncommitted (default) Reads all messages including those from open or aborted transactions High throughput, don't care about transactional boundaries
read_committed Only reads messages from committed transactions; waits for open transactions EOS consumers; financial/audit pipelines
// Consume-transform-produce exactly-once pattern
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// isolation.level=read_committed set in consumerProps

KafkaProducer<String, String> txnProducer = new KafkaProducer<>(producerProps);
// transactional.id and enable.idempotence set in producerProps
txnProducer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) continue;

    txnProducer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> record : records) {
            String transformed = transform(record.value());
            txnProducer.send(new ProducerRecord<>("output-topic", record.key(), transformed));
        }
        // Commit consumer offsets INSIDE the transaction
        txnProducer.sendOffsetsToTransaction(
            currentOffsets(records),          // Map<TopicPartition, OffsetAndMetadata>
            consumer.groupMetadata()           // ConsumerGroupMetadata (Kafka 2.5+)
        );
        txnProducer.commitTransaction();
    } catch (Exception e) {
        txnProducer.abortTransaction();
    }
}

5. Topics & Partitions

Creating Topics

# Create topic with 6 partitions, replication factor 3
kafka-topics.sh --bootstrap-server broker1:9092 \
  --create \
  --topic orders \
  --partitions 6 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config retention.ms=604800000   # 7 days

# Describe topic (shows partition-leader-replica mapping)
kafka-topics.sh --bootstrap-server broker1:9092 --describe --topic orders

# Increase partition count (can only increase, never decrease)
kafka-topics.sh --bootstrap-server broker1:9092 \
  --alter --topic orders --partitions 12

# Delete topic (requires delete.topic.enable=true on broker)
kafka-topics.sh --bootstrap-server broker1:9092 --delete --topic orders

Partition Count Guidelines

You Cannot Decrease Partitions
Kafka does not support reducing partition count on a topic. If you over-partition, you're stuck until you recreate the topic. Plan conservatively and increase later.

Retention Policies

PolicyConfigDescription
Time-based deletionretention.ms (default 7 days)Delete segments older than the retention period
Size-based deletionretention.bytes (default -1, unlimited)Delete oldest segments when total size exceeds limit
Log compactioncleanup.policy=compactRetain only the latest value per key; null-value tombstones
Compaction + deletioncleanup.policy=compact,deleteCompact and also apply time/size-based deletion

Log Compaction

Log compaction ensures that Kafka retains at least the last known value for each message key. It is ideal for changelog or event-sourced use cases (e.g., database changelogs, Kafka Streams state store changelogs).

# Configure a topic for log compaction
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type topics --entity-name user-profiles \
  --alter \
  --add-config cleanup.policy=compact,min.cleanable.dirty.ratio=0.5,delete.retention.ms=86400000

# Send a tombstone (delete key "user-123" from compacted log)
# Value must be null
echo '{"key": "user-123", "value": null}'

Unclean Leader Election

When all ISR replicas are unavailable, Kafka must decide: wait for an ISR member to come back, or elect an out-of-sync replica immediately. This is controlled by unclean.leader.election.enable.

SettingBehaviorTrade-off
false (default)Wait for ISR replica — may be unavailableNo data loss; partition unavailable until ISR returns
trueElect any replica immediatelyPartition available immediately; may lose messages not yet replicated

6. Kafka Streams

Kafka Streams is a client library (not a separate cluster) for building stateful stream processing applications. It runs inside your JVM process alongside your application code.

KStream vs KTable

AbstractionSemanticsExample
KStreamUnbounded stream of records; each record is an independent eventClick events, page views, transactions
KTableChangelog stream; each record is an upsert — the latest value per key winsUser profiles, account balances, inventory
GlobalKTableFully replicated KTable — entire table is available on every instance. No co-partitioning required for joins.Reference data, country codes, small lookup tables

Stream Topology

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
import java.util.Properties;

public class OrderProcessingTopology {

    public static KafkaStreams build() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // Exactly-once semantics within Streams
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        // State store location
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

        StreamsBuilder builder = new StreamsBuilder();

        // Source: read from "orders" topic
        KStream<String, String> ordersStream = builder.stream("orders");

        // ---- Stateless transformations ----

        // Filter: only process non-null values
        KStream<String, String> validOrders = ordersStream
            .filter((key, value) -> value != null && !value.isEmpty());

        // Map: transform key and value
        KStream<String, String> enriched = validOrders
            .mapValues(value -> enrich(value)); // transform value only

        // FlatMap: one record -> multiple records
        KStream<String, String> items = ordersStream
            .flatMapValues(order -> parseItems(order)); // each order -> list of items

        // Branch (split into multiple streams by predicate)
        Map<String, KStream<String, String>> branches = validOrders.split(Named.as("branch-"))
            .branch((k, v) -> v.contains("\"priority\":\"high\""), Branched.as("high"))
            .branch((k, v) -> v.contains("\"priority\":\"low\""),  Branched.as("low"))
            .defaultBranch(Branched.as("normal"));

        KStream<String, String> highPriority = branches.get("branch-high");
        KStream<String, String> lowPriority  = branches.get("branch-low");

        // Sink: write to output topic
        enriched.to("enriched-orders");
        highPriority.to("high-priority-orders");

        return new KafkaStreams(builder.build(), props);
    }

    private static String enrich(String value) { return value; /* placeholder */ }
    private static java.util.List<String> parseItems(String order) { return java.util.List.of(); }
}

Stateful Operations

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> events = builder.stream("order-events");

// GroupByKey — prerequisite for aggregations
KGroupedStream<String, String> grouped = events.groupByKey();

// Count per key, stored in a state store backed by RocksDB
KTable<String, Long> orderCounts = grouped
    .count(Materialized.as("order-count-store"));

// Aggregate: sum order amounts per customer
KTable<String, Double> totalSpend = grouped
    .aggregate(
        () -> 0.0,                                    // initializer
        (key, value, aggregate) -> aggregate + parseAmount(value), // aggregator
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
            .withValueSerde(Serdes.Double())
    );

// Reduce: keep running max order amount per customer
KTable<String, String> maxOrder = grouped
    .reduce((agg, newVal) -> largerAmount(agg, newVal));

// Expose state store for interactive queries (local queries only)
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

ReadOnlyKeyValueStore<String, Long> store =
    streams.store(StoreQueryParameters.fromNameAndType(
        "order-count-store", QueryableStoreTypes.keyValueStore()));

Long count = store.get("customer-123");

Windowing

Window TypeDescriptionExample
TumblingFixed-size, non-overlapping windowsRevenue per 1-hour window
HoppingFixed-size, overlapping windows (advance < size)5-min revenue every 1 min
SlidingWindows defined by time difference between recordsRecords within 30s of each other
SessionActivity-based; gap of inactivity closes the windowUser session grouping with 30-min idle gap
// Tumbling window: order count per 1-hour window
KTable<Windowed<String>, Long> hourlyOrderCounts = events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .count(Materialized.as("hourly-order-counts"));

// Hopping window: 5-minute windows advancing every 1 minute
KTable<Windowed<String>, Long> rollingCounts = events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
        .advanceBy(Duration.ofMinutes(1)))
    .count();

// Session window: group events within 30-minute inactivity gap
KTable<Windowed<String>, Long> sessionCounts = events
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count(Materialized.as("session-counts"));

// Access windowed results
hourlyOrderCounts.toStream()
    .map((windowedKey, count) -> {
        String key = windowedKey.key();
        long windowStart = windowedKey.window().start();
        long windowEnd   = windowedKey.window().end();
        return new KeyValue<>(key, String.format("%d orders [%d-%d]", count, windowStart, windowEnd));
    })
    .to("hourly-summary");

Stream Joins

Join TypeLeftRightRequirementOutput
KStream-KStreamKStreamKStreamCo-partitioned, same key, within join windowKStream
KStream-KTableKStreamKTableCo-partitioned, same keyKStream
KStream-GlobalKTableKStreamGlobalKTableNo co-partitioning requiredKStream
KTable-KTableKTableKTableCo-partitionedKTable
// KStream-KTable join: enrich order events with customer data
KStream<String, String> orders = builder.stream("orders");
KTable<String, String> customers = builder.table("customers");

KStream<String, String> enrichedOrders = orders.join(
    customers,
    (orderJson, customerJson) -> merge(orderJson, customerJson), // value joiner
    Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);

// KStream-KStream join: match orders with payments within 5 minutes
KStream<String, String> payments = builder.stream("payments");

KStream<String, String> matched = orders.join(
    payments,
    (order, payment) -> combine(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);

// GlobalKTable join (no co-partitioning needed)
GlobalKTable<String, String> regions = builder.globalTable("regions");
KStream<String, String> located = orders.join(
    regions,
    (orderKey, orderValue) -> extractRegionKey(orderValue), // key extractor for lookup
    (orderValue, regionValue) -> addRegion(orderValue, regionValue)
);

7. Kafka Connect

Kafka Connect is a framework for reliably streaming data between Kafka and other systems without writing producer/consumer code. Connectors are reusable, configurable plugins.

Architecture & Key Abstractions

The Connect Mental Model
A Connector is a job definition — it knows how to talk to an external system. A Task is a unit of work — each task handles a slice of the data. The Connect Worker is the JVM process that runs tasks. In distributed mode, workers form a cluster and automatically rebalance tasks across themselves.

Class Hierarchy

// The two top-level contracts:
//
//   SourceConnector → creates SourceTasks → produce records INTO Kafka
//   SinkConnector   → creates SinkTasks   → consume records FROM Kafka
//
// Key interfaces:
//
// Connector (abstract)
//   ├── version()                    → plugin version string
//   ├── start(Map props)             → receive config, validate, prepare
//   ├── taskClass()                  → return the Task class this connector uses
//   ├── taskConfigs(int maxTasks)    → split work into task-level configs
//   ├── stop()                       → cleanup
//   └── config()                     → define accepted configuration properties
//
// Task (abstract)
//   ├── version()
//   ├── start(Map props)             → receive task-level config from Connector
//   └── stop()
//
// SourceTask extends Task
//   └── poll()                       → return List<SourceRecord> (called in a loop)
//                                      each SourceRecord = (topic, key, value, partition, offset)
//                                      Connect handles offset commit automatically
//
// SinkTask extends Task
//   ├── put(Collection<SinkRecord>)  → receive batch of records from Kafka
//   ├── flush(Map offsets)           → commit external state, then Connect commits Kafka offsets
//   └── open(Collection partitions)  → called on partition assignment
//       close(Collection partitions) → called on partition revocation

How Work Gets Split: Connector → Tasks

The taskConfigs() Contract

The Connector's taskConfigs(int maxTasks) method is where you decide how to parallelize. You return a List<Map<String,String>> — one config map per task. Each task gets exactly one of these maps and works independently.

Example patterns:

  • JDBC Source: Each task handles a subset of tables
  • File Source: Each task handles a subset of files/directories
  • Spreadsheet Source: Master config lists N spreadsheets → split across min(N, maxTasks) tasks, each task handles its assigned spreadsheets
  • S3 Sink: Each task handles a subset of Kafka partitions (handled automatically by Connect for sink connectors)
// Example: how a source connector splits work across tasks
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    // Say we have 10 spreadsheets to monitor and maxTasks = 4
    // Split spreadsheets across tasks: [3, 3, 2, 2]
    List<List<String>> groups = partitionList(allSpreadsheetIds, maxTasks);

    List<Map<String, String>> configs = new ArrayList<>();
    for (List<String> group : groups) {
        Map<String, String> taskConfig = new HashMap<>(this.baseConfig);
        taskConfig.put("assigned.spreadsheets", String.join(",", group));
        configs.add(taskConfig);
    }
    return configs;  // Connect creates one task per config map
}

// For SINK connectors, you generally don't need to split work yourself —
// Connect automatically assigns Kafka partitions across tasks based on tasks.max

Offset Management

// SOURCE connectors: Connect tracks offsets automatically via SourceRecord
// You provide a "source partition" (which external entity) and "source offset" (position)
// Connect stores these in an internal topic (_connect-offsets)

SourceRecord record = new SourceRecord(
    // Source partition — identifies WHICH external entity
    Collections.singletonMap("spreadsheet", "sheet-abc-123"),
    // Source offset — your cursor/position within that entity
    Collections.singletonMap("row", lastProcessedRow),
    // Kafka topic, key schema, key, value schema, value
    "spreadsheet-events", null, sheetId, valueSchema, rowData
);
// On restart, Connect passes the last committed offset back to your task via
// context.offsetStorageReader().offset(partition) — so you resume where you left off

// SINK connectors: Connect manages Kafka consumer offsets for you
// Your put() receives records, your flush() signals external commit is done,
// then Connect commits the Kafka offset. If flush() throws, offsets don't advance.

Standalone vs Distributed Mode

ModeUse CaseScalabilityFault Tolerance
StandaloneDevelopment, single-node pipelinesSingle processNone
DistributedProduction — connectors distributed across worker poolAdd workers dynamicallyWorkers auto-rebalance tasks on failure

Connector Configuration

// Source connector: JDBC source — stream a Postgres table into Kafka
{
  "name": "orders-jdbc-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "kafka_user",
    "connection.password": "${file:/opt/kafka/secrets.properties:db.password}",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "table.whitelist": "orders",
    "topic.prefix": "db-",
    "poll.interval.ms": "5000",
    "batch.max.rows": "1000",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}
// Sink connector: write Kafka topic to S3 (Confluent S3 Sink)
{
  "name": "orders-s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "orders",
    "s3.region": "us-east-1",
    "s3.bucket.name": "my-kafka-data",
    "s3.part.size": "67108864",
    "flush.size": "10000",
    "rotate.interval.ms": "600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Debezium CDC Connector

Debezium captures row-level changes from database transaction logs (CDC — Change Data Capture). It reads the binlog/WAL directly, producing one Kafka message per database change.

{
  "name": "postgres-cdc-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.dbname": "mydb",
    "database.server.name": "mydb",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_pub",
    "topic.prefix": "cdc",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Debezium Production Caveats (Postgres)

The Initial Snapshot Bottleneck
When Debezium first starts (or when you add a new table), it performs an initial snapshot — a full SELECT * of each table. For a 500 GB table this can take hours and flood the Kafka cluster with data. The default Kafka Connect producer settings are tuned for low-latency streaming, not bulk throughput — you must tune them for the snapshot phase.
// Production Debezium config with snapshot tuning
{
  "name": "postgres-cdc-prod",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-primary.internal",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/opt/secrets/debezium.properties:db.password}",
    "database.dbname": "production",
    "topic.prefix": "cdc",
    "table.include.list": "public.orders,public.customers,public.events",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_prod",
    "publication.name": "debezium_pub",
    "publication.autocreate.mode": "filtered",

    // ── Snapshot tuning ─────────────────────────────────────────────
    "snapshot.mode": "initial",
    // "initial" = snapshot on first run, then stream WAL
    // "never"   = skip snapshot, stream WAL only (requires slot already at correct position)
    // "when_needed" = snapshot if no offset exists or if slot is lost

    // Snapshot fetch size: how many rows per SELECT batch
    "snapshot.fetch.size": 10000,
    // Default is 2048 — increase for large tables to reduce round trips

    // ── Kafka producer overrides (critical for snapshot throughput) ──
    // These override the Connect worker's default producer settings
    // for THIS connector only
    "producer.override.batch.size": "524288",
    // Default 16KB → 512KB: larger batches = fewer network round trips
    "producer.override.linger.ms": "100",
    // Default 0 → 100ms: wait to fill batches instead of sending immediately
    "producer.override.buffer.memory": "134217728",
    // Default 32MB → 128MB: more buffer for bursty snapshot data
    "producer.override.compression.type": "lz4",
    // Compress snapshot data (often 3-5x compression on row data)
    "producer.override.max.request.size": "10485760",
    // Default 1MB → 10MB: allow larger requests for wide rows
    "producer.override.acks": "1",
    // During snapshot: acks=1 is safe (data is in the DB, re-snapshot if lost)
    // Switch to acks=all after snapshot completes (or keep acks=all if you
    // can't tolerate re-snapshotting on failure)

    // ── Heartbeat (keeps the replication slot alive during long snapshots) ──
    "heartbeat.interval.ms": "10000",
    "heartbeat.action.query": "INSERT INTO debezium_heartbeat (ts) VALUES (NOW()) ON CONFLICT (id) DO UPDATE SET ts = NOW()",

    // ── Signal table (for ad-hoc re-snapshots without restarting) ──
    "signal.data.collection": "public.debezium_signal",
    // INSERT INTO debezium_signal VALUES ('snap-orders', 'execute-snapshot',
    //   '{"data-collections": ["public.orders"]}');

    // ── WAL disk usage protection ──
    "slot.drop.on.stop": "false",
    // Keep the slot when connector stops — BUT monitor pg_replication_slots!
    // An abandoned slot prevents WAL cleanup → disk fills → Postgres crashes

    // ── Converters ──
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "tasks.max": "1"
    // Debezium Postgres: ALWAYS 1 task. It reads a single WAL stream.
    // Multiple tasks are only useful for MySQL (one task per table during snapshot).
  }
}
Postgres-Specific Production Gotchas
  • Replication slot WAL retention: An idle or lagging Debezium connector causes Postgres to retain WAL files indefinitely. Monitor pg_replication_slots and set max_slot_wal_keep_size (PG 13+) as a safety cap. If WAL fills the disk, Postgres goes read-only.
  • Use a read replica for snapshots: The initial SELECT * can hammer the primary. Point Debezium at a replica for the snapshot, then switch to the primary's WAL stream. Debezium 2.x supports snapshot.mode=initial_only + a separate streaming connector.
  • Publication filtering: Use publication.autocreate.mode=filtered so the publication only includes tables in table.include.list. The default (all_tables) publishes every table's WAL changes, wasting I/O.
  • TOAST columns: Postgres doesn't include unchanged TOAST (large) columns in WAL. Set REPLICA IDENTITY FULL on tables with TOAST columns you need, or you'll get nulls for unchanged text/jsonb fields on UPDATE.
  • Schema changes: DDL changes (ALTER TABLE) cause Debezium to update its internal schema history. If a schema change is incompatible with Schema Registry's compatibility mode, the connector will fail. Test DDL changes in staging first.
  • Monitoring: Track MilliSecondsBehindSource (Debezium metric) — this is the lag between WAL event timestamp and when Debezium processed it. Alert if it grows.
-- Postgres setup for Debezium (run as superuser)

-- 1. Create a dedicated replication user
CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'strong_password';
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;

-- 2. Set wal_level = logical (requires restart)
ALTER SYSTEM SET wal_level = 'logical';
-- pg_ctl restart

-- 3. Safety cap on WAL retention from abandoned slots (PG 13+)
ALTER SYSTEM SET max_slot_wal_keep_size = '50GB';

-- 4. Set REPLICA IDENTITY FULL on tables with TOAST columns you care about
ALTER TABLE orders REPLICA IDENTITY FULL;
-- Without this, UPDATE on a TOAST column you didn't change shows as null

-- 5. Create heartbeat table (prevents slot from going stale during idle periods)
CREATE TABLE debezium_heartbeat (id INT PRIMARY KEY DEFAULT 1, ts TIMESTAMPTZ);
INSERT INTO debezium_heartbeat VALUES (1, NOW());
GRANT ALL ON debezium_heartbeat TO debezium;

-- 6. Create signal table (for ad-hoc re-snapshots)
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32), data VARCHAR(2048));
GRANT ALL ON debezium_signal TO debezium;

-- 7. Monitor replication slot health
SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;
-- lag should be small (< 1 GB). If it's growing, Debezium is lagging or dead.

MirrorMaker 2

MirrorMaker 2 (MM2) replicates topics, consumer group offsets, and ACLs between Kafka clusters. It's built on the Kafka Connect framework — each replication flow runs as a set of Connect source connectors.

Typical Production Use Cases
  • Cross-team data sharing: Team A owns a Kafka cluster with their event streams. Team B needs a subset of those topics for their own processing. MM2 mirrors the relevant topics into Team B's cluster — no direct cross-cluster consumer connections, no blast radius if Team B's consumers misbehave.
  • Disaster recovery (active-passive): Mirror the primary cluster to a standby in another region. If the primary fails, consumers switch to the standby. RPO depends on replication lag.
  • Active-active (multi-region): Each region produces locally and mirrors to other regions. Requires careful topic naming and consumer offset translation to avoid infinite loops.
  • Cloud migration: Mirror on-prem Kafka to a cloud cluster during migration. Consumers switch over gradually. Decommission on-prem when done.
  • Data ingestion from partner teams: Instead of granting direct access to your cluster, the other team runs their cluster independently and you pull their topics via MM2 into your own cluster for processing.

Architecture

# MM2 Components (all are Kafka Connect source connectors under the hood):
#
# MirrorSourceConnector  — replicates topic data (records) from source → target
# MirrorCheckpointConnector — replicates consumer group offsets (translated)
# MirrorHeartbeatConnector  — emits heartbeats for monitoring replication health
#
# Topic naming convention:
#   Source topic "orders" on cluster "us-east" becomes "us-east.orders" on target
#   This prefix prevents infinite loops in bidirectional replication
#
# Consumer offset translation:
#   MM2 translates source offsets to target offsets so consumers can fail over
#   Stored in the internal topic: <source-alias>.checkpoints.internal

Dedicated MM2 Cluster (Recommended)

# mm2.properties — dedicated MirrorMaker 2 configuration
# Run as: connect-mirror-maker.sh mm2.properties

# ── Cluster aliases ──
clusters = source, target
source.bootstrap.servers = source-broker1:9092,source-broker2:9092
target.bootstrap.servers = target-broker1:9092,target-broker2:9092

# ── Replication flow: source → target ──
source->target.enabled = true
source->target.topics = team-a\.events\..*,team-a\.orders
# Regex pattern — mirror all topics matching "team-a.events.*" and "team-a.orders"
# Use explicit topic list or regex. Avoid .* (mirrors internal topics too)

# Prevent reverse replication (unless active-active)
target->source.enabled = false

# ── Topic configuration ──
# Replicated topics get prefixed: "team-a.events.clicks" → "source.team-a.events.clicks"
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# To remove the prefix (flat mirroring), use IdentityReplicationPolicy
# WARNING: IdentityReplicationPolicy can cause infinite loops in bidirectional setups

# Preserve partition count on target
source->target.sync.topic.configs.enabled = true
source->target.sync.topic.acls.enabled = true

# ── Consumer group offset sync ──
source->target.emit.checkpoints.enabled = true
source->target.emit.checkpoints.interval.seconds = 30
source->target.sync.group.offsets.enabled = true
# Groups to sync (regex):
source->target.groups = team-b-consumer-.*

# ── Heartbeats for monitoring ──
source->target.emit.heartbeats.enabled = true
source->target.emit.heartbeats.interval.seconds = 10

# ── Replication tuning ──
source->target.tasks.max = 8
# More tasks = more parallelism. Each task handles a subset of partitions.
# Rule of thumb: 1 task per 20-50 partitions

# Producer overrides for the target cluster (throughput tuning)
target.producer.batch.size = 524288
target.producer.linger.ms = 50
target.producer.compression.type = lz4
target.producer.buffer.memory = 67108864

# Consumer overrides for the source cluster
source.consumer.auto.offset.reset = earliest
source.consumer.fetch.max.bytes = 52428800

MM2 as Connect Connectors (Alternative)

// Deploy MM2 connectors on an existing Connect cluster
// Useful if you already have a Connect cluster and don't want to run a separate MM2 process

// 1. MirrorSourceConnector — replicates data
{
  "name": "mm2-source-connector",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "us-east",
    "target.cluster.alias": "us-west",
    "source.cluster.bootstrap.servers": "us-east-broker1:9092",
    "target.cluster.bootstrap.servers": "us-west-broker1:9092",
    "topics": "team-a\\.events\\..*",
    "tasks.max": "4",
    "replication.factor": 3,
    "sync.topic.configs.enabled": true,
    "producer.override.compression.type": "lz4",
    "producer.override.batch.size": "524288",
    "producer.override.linger.ms": "50"
  }
}

// 2. MirrorCheckpointConnector — syncs consumer offsets
{
  "name": "mm2-checkpoint-connector",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "source.cluster.alias": "us-east",
    "target.cluster.alias": "us-west",
    "source.cluster.bootstrap.servers": "us-east-broker1:9092",
    "target.cluster.bootstrap.servers": "us-west-broker1:9092",
    "groups": ".*",
    "emit.checkpoints.interval.seconds": "30",
    "sync.group.offsets.enabled": true,
    "tasks.max": "1"
  }
}

Monitoring MM2

# 1. Check replication lag — compare source topic end offsets with mirror consumer offsets
kafka-consumer-groups.sh --bootstrap-server source-broker1:9092 \
  --describe --group mm2-source-connector-0
# LAG column shows how far behind MM2 is on the source cluster

# 2. Heartbeat topic — check on the target cluster
kafka-console-consumer.sh --bootstrap-server target-broker1:9092 \
  --topic us-east.heartbeats --from-beginning --max-messages 3
# Each heartbeat has a timestamp. If heartbeats stop, replication is broken.

# 3. Checkpoint topic — verify offset translation is working
kafka-console-consumer.sh --bootstrap-server target-broker1:9092 \
  --topic us-east.checkpoints.internal --from-beginning --max-messages 5

# 4. Compare topic record counts (quick sanity check)
# Source:
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list source-broker1:9092 --topic team-a.events.clicks --time -1
# Target (note the prefix):
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list target-broker1:9092 --topic us-east.team-a.events.clicks --time -1

# Key alerts:
#   - MM2 consumer lag > threshold (replication falling behind)
#   - Heartbeat gap > 60s (replication may be broken)
#   - Target topic partition count != source (sync.topic.configs not working)
MM2 Production Gotchas
  • Topic regex is evaluated once on start. If Team A creates a new topic matching your regex, MM2 won't pick it up until you restart the connector (or set refresh.topics.interval.seconds for periodic rediscovery).
  • Exactly-once is not guaranteed. MM2 provides at-least-once. On failover, consumers may see duplicates from the overlap period. Design consumers to be idempotent.
  • Bandwidth planning: MM2 doubles your network egress from the source cluster. If the source produces 500 MB/s, MM2 adds 500 MB/s of consumer traffic. Coordinate with the source cluster owner.
  • Don't mirror internal topics. Exclude __consumer_offsets, _schemas, _connect-* etc. Use explicit topic lists or careful regex, not topics = .*.
  • Consumer failover is manual. Offset translation is available in the checkpoint topic, but consumers must be reconfigured to read from the target cluster and the prefixed topic names. Automate this in your runbook.

Single Message Transforms (SMTs)

SMTs are lightweight, stateless transformations applied to each record as it passes through Connect.

{
  "transforms": "extractKey,addTimestamp,maskPII",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field": "id",
  "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTimestamp.timestamp.field": "kafka_ingest_time",
  "transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskPII.fields": "email,ssn",
  "transforms.maskPII.replacement": "***"
}

Dead Letter Queue

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "orders-dlq",
  "errors.deadletterqueue.topic.replication.factor": "3",
  "errors.deadletterqueue.context.headers.enable": true,
  "errors.log.enable": true,
  "errors.log.include.messages": true
}

Building a Custom Connector (Java)

When to Build Custom
Build a custom connector when no off-the-shelf connector exists for your data source/sink. The Connect framework gives you free: offset tracking, fault tolerance, task distribution, REST management API, config validation, and monitoring. You only write the integration logic.

Example: Google Sheets Source Connector

A source connector that monitors a master spreadsheet listing other spreadsheets to ingest, polls them at a configured interval, and splits work across tasks — each task handles a subset of spreadsheets.

// ── SheetsSourceConnector.java ──────────────────────────────────────
// The Connector: validates config, discovers spreadsheets, splits work into tasks

public class SheetsSourceConnector extends SourceConnector {

    private static final Logger log = LoggerFactory.getLogger(SheetsSourceConnector.class);

    // Config keys
    public static final String MASTER_SHEET_ID = "master.spreadsheet.id";
    public static final String POLL_INTERVAL_MS = "poll.interval.ms";
    public static final String TOPIC = "topic";
    public static final String SERVICE_ACCOUNT_JSON = "gcp.service.account.json";

    private String masterSheetId;
    private String topic;
    private long pollIntervalMs;
    private String serviceAccountJson;
    private List<String> spreadsheetIds;

    @Override
    public String version() { return "1.0.0"; }

    @Override
    public void start(Map<String, String> props) {
        // Called once when the connector is created/started
        masterSheetId = props.get(MASTER_SHEET_ID);
        topic = props.get(TOPIC);
        pollIntervalMs = Long.parseLong(props.getOrDefault(POLL_INTERVAL_MS, "60000"));
        serviceAccountJson = props.get(SERVICE_ACCOUNT_JSON);

        // Read the master spreadsheet to discover which sheets to ingest
        SheetsClient client = new SheetsClient(serviceAccountJson);
        spreadsheetIds = client.listTargetSheets(masterSheetId);
        log.info("Discovered {} spreadsheets from master config", spreadsheetIds.size());
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SheetsSourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // Split spreadsheets across tasks — each task gets an independent subset
        int numTasks = Math.min(maxTasks, spreadsheetIds.size());
        List<Map<String, String>> configs = new ArrayList<>();

        // Round-robin assignment
        List<List<String>> assignments = new ArrayList<>();
        for (int i = 0; i < numTasks; i++) assignments.add(new ArrayList<>());
        for (int i = 0; i < spreadsheetIds.size(); i++) {
            assignments.get(i % numTasks).add(spreadsheetIds.get(i));
        }

        for (List<String> assigned : assignments) {
            Map<String, String> taskConfig = new HashMap<>();
            taskConfig.put("assigned.spreadsheets", String.join(",", assigned));
            taskConfig.put(TOPIC, topic);
            taskConfig.put(POLL_INTERVAL_MS, String.valueOf(pollIntervalMs));
            taskConfig.put(SERVICE_ACCOUNT_JSON, serviceAccountJson);
            configs.add(taskConfig);
        }
        return configs;
    }

    @Override
    public void stop() {
        // Cleanup resources (HTTP clients, etc.)
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define(MASTER_SHEET_ID, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
                    "Google Sheets ID of the master configuration spreadsheet")
            .define(TOPIC, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
                    "Kafka topic to produce records to")
            .define(POLL_INTERVAL_MS, ConfigDef.Type.LONG, 60000L, ConfigDef.Importance.MEDIUM,
                    "How often to poll spreadsheets for new data (ms)")
            .define(SERVICE_ACCOUNT_JSON, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH,
                    "GCP service account JSON key");
    }
}
// ── SheetsSourceTask.java ───────────────────────────────────────────
// The Task: polls assigned spreadsheets, emits SourceRecords

public class SheetsSourceTask extends SourceTask {

    private String topic;
    private long pollIntervalMs;
    private List<String> assignedSheets;
    private SheetsClient client;

    // Track last-read row per spreadsheet (our "offset")
    private Map<String, Long> lastRow = new HashMap<>();

    @Override
    public String version() { return "1.0.0"; }

    @Override
    public void start(Map<String, String> props) {
        topic = props.get("topic");
        pollIntervalMs = Long.parseLong(props.get("poll.interval.ms"));
        assignedSheets = Arrays.asList(props.get("assigned.spreadsheets").split(","));
        client = new SheetsClient(props.get("gcp.service.account.json"));

        // Restore offsets from Connect's offset storage (survives restarts)
        for (String sheetId : assignedSheets) {
            Map<String, Object> partition = Collections.singletonMap("spreadsheet", sheetId);
            Map<String, Object> offset = context.offsetStorageReader().offset(partition);
            if (offset != null && offset.containsKey("row")) {
                lastRow.put(sheetId, (Long) offset.get("row"));
            } else {
                lastRow.put(sheetId, 0L);  // start from beginning
            }
        }
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        // Called in a loop by the Connect worker — return records or block
        List<SourceRecord> records = new ArrayList<>();

        for (String sheetId : assignedSheets) {
            long startRow = lastRow.get(sheetId);
            List<Map<String, Object>> rows = client.readRowsAfter(sheetId, startRow);

            for (Map<String, Object> row : rows) {
                long rowNum = (Long) row.get("_row_number");

                // Source partition: identifies which spreadsheet
                Map<String, String> sourcePartition =
                    Collections.singletonMap("spreadsheet", sheetId);
                // Source offset: row number — our cursor for resuming
                Map<String, Long> sourceOffset =
                    Collections.singletonMap("row", rowNum);

                Schema valueSchema = buildSchema(row);  // dynamic schema from columns
                Struct value = buildStruct(valueSchema, row);

                records.add(new SourceRecord(
                    sourcePartition, sourceOffset,
                    topic, null, sheetId,          // key = spreadsheet ID
                    valueSchema, value
                ));
                lastRow.put(sheetId, rowNum);
            }
        }

        if (records.isEmpty()) {
            Thread.sleep(pollIntervalMs);  // backoff when no new data
        }
        return records;
        // Connect commits offsets automatically after records are acked by Kafka
    }

    @Override
    public void stop() {
        client.close();
    }
}

Example: PII Masking Sink Connector

A sink connector (or SMT) that masks sensitive fields before writing to an external system. Two approaches:

// ── Approach 1: Custom SMT (recommended for simple field masking) ───
// SMTs are lightweight, reusable, and work with ANY connector

public class PiiMaskTransform<R extends ConnectRecord<R>>
        implements Transformation<R> {

    private List<String> fieldsToMask;
    private String replacement;

    @Override
    public void configure(Map<String, ?> configs) {
        fieldsToMask = Arrays.asList(
            ((String) configs.get("fields")).split(","));
        replacement = (String) configs.getOrDefault("replacement", "***REDACTED***");
    }

    @Override
    public R apply(R record) {
        if (record.value() == null) return record;

        Schema schema = record.valueSchema();
        Struct original = (Struct) record.value();
        Struct masked = new Struct(schema);

        for (Field field : schema.fields()) {
            Object value = original.get(field);
            if (fieldsToMask.contains(field.name()) && value instanceof String) {
                // Mask: keep first/last char for debugging, mask the rest
                String str = (String) value;
                if (str.length() > 4) {
                    masked.put(field, str.charAt(0)
                        + replacement
                        + str.charAt(str.length() - 1));
                } else {
                    masked.put(field, replacement);
                }
            } else {
                masked.put(field, value);
            }
        }

        return record.newRecord(
            record.topic(), record.kafkaPartition(),
            record.keySchema(), record.key(),
            schema, masked,
            record.timestamp()
        );
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("fields", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
                    "Comma-separated list of fields to mask")
            .define("replacement", ConfigDef.Type.STRING, "***REDACTED***",
                    ConfigDef.Importance.LOW, "Replacement string");
    }

    @Override
    public void close() {}
}
// Deploy the PII mask SMT with any sink connector:
{
  "name": "s3-sink-with-pii-masking",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "customer-events",
    "tasks.max": "4",

    "transforms": "maskPii",
    "transforms.maskPii.type": "com.example.PiiMaskTransform",
    "transforms.maskPii.fields": "email,ssn,phone,credit_card",
    "transforms.maskPii.replacement": "***",

    "s3.bucket.name": "analytics-data-lake",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat"
  }
}
// ── Approach 2: Full SinkConnector with built-in masking ────────────
// Use when masking logic is complex (regex patterns, format-preserving
// encryption, field-level policies from a config service) and you need
// full control over the sink write path.

public class MaskingSinkTask extends SinkTask {
    private MaskingPolicy policy;     // loaded from config service
    private ExternalWriter writer;     // writes to target system

    @Override
    public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            Struct value = (Struct) record.value();
            Struct masked = policy.apply(value);  // per-field masking rules
            writer.write(masked);
        }
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        writer.flush();  // commit to external system
        // After flush() returns, Connect commits Kafka offsets
        // If flush() throws, offsets are NOT committed → records will be re-delivered
    }
}

Packaging & Deployment

Project Structure

# Standard Maven project layout for a Connect plugin
my-sheets-connector/
├── pom.xml
├── src/main/java/com/example/connect/
│   ├── SheetsSourceConnector.java
│   ├── SheetsSourceTask.java
│   ├── SheetsClient.java           # Google Sheets API wrapper
│   └── PiiMaskTransform.java       # bonus: package SMTs in the same plugin
├── src/main/resources/
│   └── META-INF/services/          # NOT needed — Connect uses plugin.path scanning
└── src/test/java/com/example/connect/
    ├── SheetsSourceConnectorTest.java
    └── SheetsSourceTaskTest.java

Maven pom.xml

<project>
  <groupId>com.example</groupId>
  <artifactId>sheets-source-connector</artifactId>
  <version>1.0.0</version>
  <packaging>jar</packaging>

  <dependencies>
    <!-- Kafka Connect API — PROVIDED, already on the worker classpath -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <version>3.6.0</version>
      <scope>provided</scope>
    </dependency>

    <!-- Your connector's actual dependencies -->
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-sheets</artifactId>
      <version>v4-rev20231218-2.0.0</version>
    </dependency>

    <!-- Test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <!-- Shade plugin: creates uber-jar with all deps (excluding Kafka Connect API) -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.5.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals><goal>shade</goal></goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

Build, Package, Deploy

# 1. Build the connector
mvn clean package -DskipTests
# produces: target/sheets-source-connector-1.0.0.jar

# 2. Deploy to Connect workers — two approaches:

# ── Option A: Copy JAR to plugin.path directory ──
# Each connector lives in its own subdirectory under plugin.path
# Connect discovers it automatically on worker restart
scp target/sheets-source-connector-1.0.0.jar \
  connect-host:/usr/share/confluent-hub-components/sheets-source-connector/

# Restart Connect worker to pick up new plugin
ssh connect-host "docker compose restart kafka-connect"

# ── Option B: Bake into Docker image (recommended for production) ──
# Dockerfile for a custom Connect image with your plugin pre-installed
# Dockerfile.connect
FROM confluentinc/cp-kafka-connect:7.6.0

# Install community connectors via confluent-hub
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.4.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.7

# Install your custom connector
COPY target/sheets-source-connector-1.0.0.jar \
     /usr/share/confluent-hub-components/sheets-source-connector/

# Build and push
# docker build -f Dockerfile.connect -t my-registry/kafka-connect:v1.2.0 .
# docker push my-registry/kafka-connect:v1.2.0

Deploy and Manage via REST API

# Verify plugin is loaded (after worker restart or new image deploy)
curl http://connect-host:8083/connector-plugins | jq '.[].class'
# Should include: "com.example.connect.SheetsSourceConnector"

# Create the connector instance
curl -X POST http://connect-host:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "google-sheets-source",
    "config": {
        "connector.class": "com.example.connect.SheetsSourceConnector",
        "master.spreadsheet.id": "1AbC_xYz_masterSheetId",
        "topic": "spreadsheet-data",
        "poll.interval.ms": "300000",
        "gcp.service.account.json": "${file:/opt/secrets/gcp.json:key}",
        "tasks.max": "4",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}'

# Check status — "RUNNING" for connector and all tasks
curl http://connect-host:8083/connectors/google-sheets-source/status | jq .
# {
#   "name": "google-sheets-source",
#   "connector": {"state": "RUNNING", "worker_id": "connect-1:8083"},
#   "tasks": [
#     {"id": 0, "state": "RUNNING", "worker_id": "connect-1:8083"},
#     {"id": 1, "state": "RUNNING", "worker_id": "connect-2:8083"},
#     {"id": 2, "state": "RUNNING", "worker_id": "connect-1:8083"},
#     {"id": 3, "state": "RUNNING", "worker_id": "connect-2:8083"}
#   ]
# }

# Update config (zero-downtime — Connect restarts tasks automatically)
curl -X PUT http://connect-host:8083/connectors/google-sheets-source/config \
  -H "Content-Type: application/json" \
  -d '{ ... updated config ... }'

# Restart a single failed task
curl -X POST http://connect-host:8083/connectors/google-sheets-source/tasks/2/restart

# Delete the connector
curl -X DELETE http://connect-host:8083/connectors/google-sheets-source

Testing Connectors

// Unit test a SourceTask — mock the external system, verify SourceRecords
@Test
public void testPollReturnsNewRows() {
    SheetsSourceTask task = new SheetsSourceTask();

    // Mock the offset storage reader (no prior offsets)
    SourceTaskContext context = mock(SourceTaskContext.class);
    OffsetStorageReader offsetReader = mock(OffsetStorageReader.class);
    when(context.offsetStorageReader()).thenReturn(offsetReader);
    when(offsetReader.offset(any())).thenReturn(null);
    task.initialize(context);

    // Start with test config
    Map<String, String> props = Map.of(
        "assigned.spreadsheets", "sheet-1,sheet-2",
        "topic", "test-topic",
        "poll.interval.ms", "100",
        "gcp.service.account.json", "{...test key...}"
    );
    task.start(props);

    // Poll and verify
    List<SourceRecord> records = task.poll();
    assertFalse(records.isEmpty());
    assertEquals("test-topic", records.get(0).topic());
    assertEquals("sheet-1", records.get(0).key());
}

// Integration test: use EmbeddedConnectCluster (Kafka's test utilities)
// or Testcontainers with a real Kafka + Connect worker
// See: org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster

8. Schema Registry

Schema Registry stores versioned schemas and enforces schema compatibility between producers and consumers. Without it, schema evolution is a coordination nightmare.

Formats

FormatBinaryHuman-readableNotes
AvroYesNo (schema separate)Most common; compact; strong schema evolution support
ProtobufYesNoExcellent multi-language support; field numbers for evolution
JSON SchemaNoYesEasiest to debug; largest messages; weakest validation

Compatibility Modes

ModeNew schema canOld consumersOld producers
BACKWARD (default)Delete fields, add optional fieldsRead new dataMust upgrade first
FORWARDAdd fields, delete optional fieldsMust upgrade firstCan keep old schema
FULLAdd/delete optional fields onlyRead new dataCan keep old schema
NONEAnythingMay breakMay break
BACKWARD_TRANSITIVECompatible with ALL previous versionsSafeMust upgrade

Avro Schema Example

// Schema version 1 — initial
{
  "type": "record",
  "name": "Order",
  "namespace": "com.example",
  "fields": [
    {"name": "id",         "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "amount",     "type": "double"},
    {"name": "createdAt",  "type": "long", "logicalType": "timestamp-millis"}
  ]
}

// Schema version 2 — BACKWARD compatible additions
// Added optional field "region" with default (safe to add in BACKWARD mode)
{
  "type": "record",
  "name": "Order",
  "namespace": "com.example",
  "fields": [
    {"name": "id",         "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "amount",     "type": "double"},
    {"name": "createdAt",  "type": "long", "logicalType": "timestamp-millis"},
    {"name": "region",     "type": ["null", "string"], "default": null}
  ]
}

Using Schema Registry in Java

// Producer with Avro + Schema Registry
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
// Subject naming strategy: {topic}-value (default)
// props.put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

Schema schema = new Schema.Parser().parse(new File("order.avsc"));
GenericRecord order = new GenericData.Record(schema);
order.put("id", "order-123");
order.put("customerId", "cust-456");
order.put("amount", 99.99);
order.put("createdAt", System.currentTimeMillis());
order.put("region", "us-east");

producer.send(new ProducerRecord<>("orders", "order-123", order));

9. Security

SSL/TLS Encryption

# Broker config (server.properties)
listeners=SSL://broker1:9093
ssl.keystore.location=/var/ssl/private/broker1.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/private/broker.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required    # require mutual TLS (mTLS)
ssl.endpoint.identification.algorithm=https
// Client SSL config
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststorePassword");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystorePassword");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "keyPassword");

SASL Authentication

MechanismUse CaseNotes
PLAINSimple username/passwordCredentials sent in plaintext — always use with TLS
SCRAM-SHA-256/512Username/password with challenge-responseSafer than PLAIN; credentials stored hashed in ZK/KRaft
GSSAPI (Kerberos)Enterprise/Hadoop environmentsRequires KDC; most complex to set up
OAUTHBEARERToken-based auth (OAuth 2.0 / OIDC)Modern cloud-native setups; supports token refresh
# Broker: SASL_SSL listener
listeners=SASL_SSL://broker1:9094
sasl.enabled.mechanisms=SCRAM-SHA-512
ssl.keystore.location=/var/ssl/private/broker1.keystore.jks
ssl.keystore.password=keystore-password

# Create SCRAM credential for a user
kafka-configs.sh --bootstrap-server broker1:9092 \
  --alter --entity-type users --entity-name alice \
  --add-config 'SCRAM-SHA-512=[iterations=8192,password=alice-secret]'
// Client SASL/SCRAM config
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"alice\" password=\"alice-secret\";");

ACLs (Access Control Lists)

# Allow producer "alice" to write to "orders" topic
kafka-acls.sh --bootstrap-server broker1:9092 \
  --add \
  --allow-principal User:alice \
  --operation Write \
  --operation Describe \
  --topic orders

# Allow consumer group "order-processor" to read "orders"
kafka-acls.sh --bootstrap-server broker1:9092 \
  --add \
  --allow-principal User:service-account \
  --operation Read \
  --operation Describe \
  --topic orders

kafka-acls.sh --bootstrap-server broker1:9092 \
  --add \
  --allow-principal User:service-account \
  --operation Read \
  --group order-processing-group

# List ACLs for a topic
kafka-acls.sh --bootstrap-server broker1:9092 --list --topic orders

# Deny all to a principal (blacklist)
kafka-acls.sh --bootstrap-server broker1:9092 \
  --add \
  --deny-principal User:bad-actor \
  --operation All \
  --topic '*'

10. Operations

Essential CLI Commands

# --- Topics ---
# List all topics
kafka-topics.sh --bootstrap-server broker1:9092 --list

# Describe a specific topic (shows partition/replica/ISR details)
kafka-topics.sh --bootstrap-server broker1:9092 --describe --topic orders

# Describe ALL topics (useful for auditing)
kafka-topics.sh --bootstrap-server broker1:9092 --describe

# --- Producing and Consuming (for testing) ---
# Console producer
kafka-console-producer.sh --bootstrap-server broker1:9092 \
  --topic orders \
  --property "key.separator=:" \
  --property "parse.key=true"
# Then type: order-123:{"amount":99.99}

# Console consumer — read from beginning
kafka-console-consumer.sh --bootstrap-server broker1:9092 \
  --topic orders \
  --from-beginning \
  --property print.key=true \
  --property key.separator=" | "

# Console consumer — specific partition + offset
kafka-console-consumer.sh --bootstrap-server broker1:9092 \
  --topic orders \
  --partition 0 \
  --offset 100 \
  --max-messages 10

# --- Consumer Groups ---
# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list

# Describe group (shows lag per partition)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --describe --group order-processing-group

# Reset offsets to beginning (must stop consumers first)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --group order-processing-group \
  --topic orders \
  --reset-offsets --to-earliest --execute

# Reset to specific offset
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --group order-processing-group \
  --topic orders:0 \
  --reset-offsets --to-offset 500 --execute

# Reset to timestamp (ISO 8601)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --group order-processing-group \
  --topic orders \
  --reset-offsets --to-datetime 2026-02-01T00:00:00.000 --execute

# Delete consumer group (must have no active members)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --delete --group old-group

# --- Configs ---
# Dynamically update broker config at runtime
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type brokers --entity-name 1 \
  --alter --add-config log.retention.ms=86400000

# Update topic config
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.ms=604800000,min.insync.replicas=2

# Describe topic configs (shows overrides)
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type topics --entity-name orders --describe

# --- Log Dirs ---
# Show log directory info (disk usage, replicas)
kafka-log-dirs.sh --bootstrap-server broker1:9092 \
  --topic-list orders --broker-list 1,2,3

# --- Partition Reassignment ---
# Generate reassignment plan
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3" \
  --generate

# Execute reassignment
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
  --reassignment-json-file reassignment.json \
  --execute

# Verify reassignment progress
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
  --reassignment-json-file reassignment.json \
  --verify

Monitoring: Key JMX Metrics

MetricMBeanAlert if
UnderReplicatedPartitionskafka.server:type=ReplicaManager,name=UnderReplicatedPartitions> 0
ActiveControllerCountkafka.controller:type=KafkaController,name=ActiveControllerCount!= 1
OfflinePartitionsCountkafka.controller:type=KafkaController,name=OfflinePartitionsCount> 0
MessagesInPerSeckafka.server:type=BrokerTopicMetrics,name=MessagesInPerSecDrop unexpectedly
BytesInPerSec / BytesOutPerSeckafka.server:type=BrokerTopicMetricsSaturation near NIC limit
RequestHandlerAvgIdlePercentkafka.server:type=KafkaRequestHandlerPool< 30%
Consumer Lagkafka.consumer:type=consumer-fetch-manager-metrics,client-id=XGrowing over time
LogFlushRateAndTimeMskafka.log:type=LogFlushStatsHigh p99 flush time
# Check consumer lag from CLI
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --describe --group order-processing-group

# Output columns:
# TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST
# orders  0          5000            5010            10   ...          ...
# orders  1          8000            8000            0    ...          ...

# High-resolution lag check with kafka-consumer-groups (all groups)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --describe --all-groups 2>/dev/null | awk 'NR>1 {print $1, $6}' | sort -k2 -rn | head -20

11. Production Setup

Deployment Options

OptionWho Manages BrokersBest ForTrade-off
Self-managed on EC2/VMsYouFull control, cost optimization at scale, custom tuningYou own upgrades, patching, monitoring, and failover
Amazon MSKAWSAWS-native teams, serverless option availableLess config flexibility, MSK-specific quirks (e.g., storage auto-scaling limits)
Confluent CloudConfluentSchema Registry + ksqlDB + Connect as managed servicesMost expensive, vendor lock-in on Confluent-specific features
Kubernetes (Strimzi)You (via operator)Existing K8s infra, GitOps workflowsK8s networking complexity, persistent volume management
Aiven / Redpanda CloudProviderMulti-cloud managed Kafka (Aiven) or Kafka-compatible (Redpanda)Smaller ecosystem than Confluent, Redpanda has minor API differences

Self-Managed on EC2 with Docker

Common Production Pattern
Kafka brokers run as Docker containers on EC2 instances. Each broker gets a dedicated EBS volume for log data. Kafka Connect runs as a separate Docker cluster. A monitoring stack (Prometheus + Grafana) tracks broker health, consumer lag, and disk utilization.

Infrastructure Layout

# Typical 3-broker production cluster on EC2
#
# EC2 Instance Type: r6i.xlarge (4 vCPU, 32 GB RAM) — Kafka is memory + I/O heavy
#   - General purpose (m6i) works for moderate throughput
#   - Storage optimized (i3/d3) if you want NVMe local disks instead of EBS
#
# EBS Volume per broker:
#   - Type: gp3 (3,000 IOPS baseline, burstable, cheaper than io2)
#   - Size: Start with 500 GB, monitor and expand as needed
#   - Mount: /data/kafka — this is your log.dirs path
#   - DO NOT use the root volume for Kafka data
#
# Networking:
#   - Same VPC, spread across 3 AZs for fault tolerance
#   - Security group: allow 9092 (broker), 9093 (inter-broker), 8083 (Connect REST)
#   - Private subnet — no public IPs on brokers

# Directory structure on each EC2 instance
/data/kafka/           # EBS volume mount — Kafka log.dirs
/opt/kafka/            # Kafka binaries (in Docker image)
/opt/kafka-connect/    # Connect worker (separate container)
/var/log/kafka/        # Application logs (stdout from Docker)

Docker Compose for a Broker Node

# docker-compose.yml — run on each EC2 instance
# Each instance runs ONE broker. Don't colocate brokers.
version: "3.8"
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    container_name: kafka-broker
    network_mode: host          # simplifies advertised.listeners
    restart: always
    volumes:
      - /data/kafka:/var/lib/kafka/data     # EBS volume
      - /var/log/kafka:/var/log/kafka
    environment:
      KAFKA_NODE_ID: 1                                      # unique per broker
      KAFKA_PROCESS_ROLES: broker,controller                # KRaft mode (no ZK)
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker1:9093,2@broker2:9093,3@broker3:9093
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1.internal:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_LOG_RETENTION_HOURS: 168                        # 7 days
      KAFKA_LOG_RETENTION_BYTES: -1                         # unlimited (rely on time)
      KAFKA_NUM_PARTITIONS: 6                               # default for auto-created topics
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2                          # acks=all needs 2/3 in sync
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"              # explicit topic creation only
      KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
      KAFKA_LOG_SEGMENT_BYTES: 1073741824                   # 1 GB segments
      KAFKA_NUM_NETWORK_THREADS: 8
      KAFKA_NUM_IO_THREADS: 16
      KAFKA_HEAP_OPTS: "-Xms6g -Xmx6g"                     # ~25% of instance RAM
      KAFKA_JMX_PORT: 9999                                  # for monitoring
      KAFKA_JMX_HOSTNAME: broker1.internal
    ulimits:
      nofile:
        soft: 65536
        hard: 65536
KRaft vs ZooKeeper
Kafka 3.3+ supports KRaft mode (no ZooKeeper). New clusters should use KRaft — simpler architecture, faster controller failover, fewer moving parts. ZooKeeper mode is deprecated as of Kafka 3.5 and will be removed in Kafka 4.0.

Kafka Connect Cluster

# docker-compose.connect.yml — run on separate EC2 instances
services:
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.6.0
    container_name: kafka-connect
    network_mode: host
    restart: always
    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker1:9092,broker2:9092,broker3:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
    volumes:
      - ./connect-plugins:/usr/share/confluent-hub-components
# Deploy a connector via REST API
curl -X POST http://connect-host:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "s3-sink-events",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "events",
    "s3.region": "us-east-1",
    "s3.bucket.name": "my-data-lake",
    "s3.part.size": 52428800,
    "flush.size": 10000,
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'\"'\"'year'\"'\"'=YYYY/'\"'\"'month'\"'\"'=MM/'\"'\"'day'\"'\"'=dd",
    "locale": "en-US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "event_time",
    "tasks.max": 4
  }
}'

# Check connector status
curl http://connect-host:8083/connectors/s3-sink-events/status | jq .

# Pause / resume / restart a connector
curl -X PUT http://connect-host:8083/connectors/s3-sink-events/pause
curl -X PUT http://connect-host:8083/connectors/s3-sink-events/resume
curl -X POST http://connect-host:8083/connectors/s3-sink-events/restart

# List installed connector plugins
curl http://connect-host:8083/connector-plugins | jq '.[].class'

EBS Volume Best Practices

SettingRecommendationWhy
Volume typegp3 (general), io2 (high throughput)gp3 gives 3,000 IOPS + 125 MB/s baseline for free; io2 for sustained high I/O
SizePlan for retention period × daily ingest × replication factorE.g., 7 days × 50 GB/day × RF 3 = 1 TB per broker
FilesystemXFSBetter than ext4 for Kafka's sequential write + concurrent read pattern
Mount optionsnoatime,nodiratimeAvoid unnecessary metadata writes on every read
Multiple volumesUse log.dirs=/data1,/data2 with separate EBS volumesStriping across volumes increases throughput; broker survives single volume failure (JBOD mode)
SnapshotsEBS snapshots for disaster recovery, NOT for backupsKafka's replication is the primary backup mechanism. Snapshots are for catastrophic AZ-level failures
# Format and mount an EBS volume for Kafka
# 1. Attach volume in AWS console or CLI (e.g., /dev/xvdf)
# 2. Format with XFS
sudo mkfs.xfs /dev/xvdf

# 3. Mount with optimized options
sudo mkdir -p /data/kafka
sudo mount -o noatime,nodiratime /dev/xvdf /data/kafka

# 4. Persist in fstab
echo "/dev/xvdf /data/kafka xfs noatime,nodiratime 0 0" | sudo tee -a /etc/fstab

# 5. Set ownership for the Kafka container user
sudo chown -R 1000:1000 /data/kafka    # cp-kafka runs as UID 1000

Amazon MSK

# Create an MSK cluster via AWS CLI
aws kafka create-cluster \
  --cluster-name "prod-events" \
  --kafka-version "3.6.0" \
  --number-of-broker-nodes 3 \
  --broker-node-group-info '{
    "InstanceType": "kafka.m5.large",
    "ClientSubnets": ["subnet-aaa", "subnet-bbb", "subnet-ccc"],
    "SecurityGroups": ["sg-kafka"],
    "StorageInfo": {
      "EbsStorageInfo": {
        "VolumeSize": 500,
        "ProvisionedThroughput": {
          "Enabled": true,
          "VolumeThroughput": 250
        }
      }
    }
  }' \
  --encryption-info '{
    "EncryptionInTransit": {"ClientBroker": "TLS", "InCluster": true}
  }' \
  --enhanced-monitoring "PER_TOPIC_PER_BROKER"

# Get bootstrap brokers
aws kafka get-bootstrap-brokers --cluster-arn <arn>
# Returns: BootstrapBrokerStringTls: "b-1.prod.xxx:9094,b-2.prod.xxx:9094,..."

# MSK Serverless — zero capacity planning (pay per data)
aws kafka create-cluster-v2 \
  --cluster-name "serverless-events" \
  --serverless '{
    "VpcConfigs": [{"SubnetIds": ["subnet-aaa","subnet-bbb"], "SecurityGroupIds": ["sg-kafka"]}],
    "ClientAuthentication": {"Sasl": {"Iam": {"Enabled": true}}}
  }'
MSK vs Self-Managed
MSK wins on: Zero-downtime version upgrades, automatic broker replacement, integrated CloudWatch metrics, IAM auth.
Self-managed wins on: Full config control (MSK restricts some broker configs), latest Kafka versions faster, cost at scale (MSK has a per-broker-hour markup), KRaft mode (MSK still uses ZK as of early 2026).

Kubernetes (Strimzi)

# Strimzi Kafka CRD — declarative Kafka cluster on K8s
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: prod-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      auto.create.topics.enable: false
    storage:
      type: persistent-claim
      size: 500Gi
      class: gp3-encrypted       # StorageClass backed by EBS gp3
      deleteClaim: false          # keep PVCs on cluster deletion
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "4"
    jvmOptions:
      -Xms: 4096m
      -Xmx: 4096m
    rack:
      topologyKey: topology.kubernetes.io/zone    # spread across AZs
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 50Gi
      class: gp3-encrypted
  entityOperator:
    topicOperator: {}             # manage topics via KafkaTopic CRDs
    userOperator: {}              # manage ACLs via KafkaUser CRDs
# Manage topics declaratively via CRD (GitOps-friendly)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: orders
  labels:
    strimzi.io/cluster: prod-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: "604800000"     # 7 days
    min.insync.replicas: "2"
    cleanup.policy: delete

Monitoring Stack

The Four Pillars of Kafka Monitoring
  1. Broker health — under-replicated partitions, offline partitions, controller status
  2. Consumer lag — the #1 operational metric. Growing lag = consumer can't keep up
  3. Disk utilization — brokers die when disk fills; must alert early
  4. Throughput — bytes in/out per broker, messages/sec per topic

Prometheus + Grafana (Self-Managed)

# JMX Exporter sidecar — expose Kafka JMX metrics as Prometheus endpoints
# Add to your broker's Docker Compose:
services:
  kafka:
    environment:
      KAFKA_JMX_PORT: 9999
      KAFKA_OPTS: "-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=7071:/opt/jmx-exporter/kafka-broker.yml"
    volumes:
      - ./jmx-exporter:/opt/jmx-exporter

# prometheus.yml — scrape targets
scrape_configs:
  - job_name: kafka-brokers
    static_configs:
      - targets:
        - broker1:7071
        - broker2:7071
        - broker3:7071
  - job_name: kafka-lag
    static_configs:
      - targets:
        - kafka-lag-exporter:8000

Consumer Lag Monitoring

# Burrow (LinkedIn) — dedicated consumer lag monitoring service
# Evaluates lag status: OK, WARNING, STALLED, ERR, STOP
# Exposes HTTP API for lag per consumer group and partition
# docker run -v /path/to/burrow.toml:/etc/burrow/burrow.toml linkedin/burrow

# Kafka Lag Exporter (Lightbend) — Prometheus-native
# Helm chart for K8s:
# helm install kafka-lag-exporter kafka-lag-exporter/kafka-lag-exporter \
#   --set clusters[0].name=prod --set clusters[0].bootstrapBrokers=broker1:9092

# CLI — quick check
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --describe --group my-consumer-group
# Look at LAG column — should be near 0 and stable

# Key lag thresholds (tune for your SLA):
#   LAG < 100:        OK
#   LAG 100 - 10000:  WARNING — consumer falling behind
#   LAG > 10000:      CRITICAL — something is wrong
#   LAG growing:      CRITICAL regardless of absolute value

Critical Alerts

AlertConditionAction
Disk > 75%EBS volume utilization > 75%Expand EBS volume, reduce retention, or add brokers
Disk > 90%EBS volume utilization > 90%Urgent: Broker will stop accepting writes. Reduce retention immediately: kafka-configs.sh --alter --add-config retention.ms=86400000
Under-replicated > 0Partitions with fewer in-sync replicas than expectedCheck broker health, network, disk I/O. Could be a slow/dead broker
Offline partitions > 0Partitions with no available leaderBroker is down. Check logs. If broker can't recover, reassign partitions
Consumer lag growingLag increasing over 15-min windowScale consumers, check for slow processing, verify downstream health
Controller count != 1No active controller or multiple controllersCluster split-brain or controller crash. Restart affected broker

Message Inspection Tools

# Console consumer — quick peek at a topic
kafka-console-consumer.sh --bootstrap-server broker1:9092 \
  --topic orders --from-beginning --max-messages 5 \
  --property print.key=true --property print.timestamp=true

# kcat (formerly kafkacat) — the "Swiss army knife" for Kafka
# Install: brew install kcat  (or apt-get install kafkacat)

# Consume with metadata (partition, offset, timestamp)
kcat -b broker1:9092 -t orders -C -f '%T %p %o %k %s\n' -c 5

# Produce a message
echo '{"id":1,"amount":99.99}' | kcat -b broker1:9092 -t orders -P -k "order-1"

# List topics and metadata
kcat -b broker1:9092 -L

# Consume a specific partition from a specific offset
kcat -b broker1:9092 -t orders -p 2 -o 1000 -c 10

# AKHQ (formerly KafkaHQ) — web UI for topic browsing, consumer groups, ACLs
# docker run -p 8080:8080 -e AKHQ_CONFIGURATION='...' tchiotludo/akhq
# Or Conduktor, Kafka UI (provectus/kafka-ui) — all serve the same purpose

Cluster Administration

Preferred Leader Election

# After a broker restart, it may not be the leader of its partitions anymore
# (leadership moved to another ISR member during the outage)
# Trigger preferred leader election to rebalance leadership

# Elect preferred leaders for ALL topics
kafka-leader-election.sh --bootstrap-server broker1:9092 \
  --election-type PREFERRED --all-topic-partitions

# Elect for a specific topic
kafka-leader-election.sh --bootstrap-server broker1:9092 \
  --election-type PREFERRED \
  --topic orders --partition 0

# UNCLEAN election (data loss risk!) — only as last resort when all ISR are dead
kafka-leader-election.sh --bootstrap-server broker1:9092 \
  --election-type UNCLEAN \
  --topic orders --partition 0

# Verify leadership distribution
kafka-topics.sh --bootstrap-server broker1:9092 --describe --topic orders
# Leader column should be evenly spread across brokers

Adding and Removing Brokers

# Adding a broker:
# 1. Launch new EC2 instance with EBS volume + Docker setup
# 2. Configure with a new unique broker.id and join the controller quorum
# 3. Start the broker — it joins the cluster but has NO partitions yet
# 4. Reassign partitions to include the new broker

# Generate reassignment plan
cat <<EOF > topics.json
{"topics": [{"topic": "orders"}, {"topic": "events"}], "version": 1}
EOF

kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate
# Save the "Proposed partition reassignment" output to reassignment.json

# Execute the reassignment (runs in background, copies data)
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
  --reassignment-json-file reassignment.json \
  --execute \
  --throttle 50000000    # 50 MB/s throttle to avoid impacting production

# Monitor progress
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
  --reassignment-json-file reassignment.json \
  --verify

# Remove throttle after completion
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type brokers --entity-name 1 \
  --alter --delete-config leader.replication.throttled.rate
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type brokers --entity-name 4 \
  --alter --delete-config follower.replication.throttled.rate

# Removing a broker:
# 1. Reassign all partitions OFF the broker (use --broker-list without the broker)
# 2. Wait for reassignment to complete (--verify)
# 3. Stop the broker
# 4. Terminate the EC2 instance

Scaling: When Brokers Run Out of Disk

Disk Full = Broker Down
A Kafka broker that runs out of disk space will stop accepting writes and may crash. This can cascade — producers retry to other brokers, overloading them. Always alert at 75% and take action before 90%.
# Immediate actions when disk is filling up:

# 1. Reduce retention on high-volume topics (buys time)
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type topics --entity-name high-volume-events \
  --alter --add-config retention.ms=86400000    # 1 day instead of 7

# 2. Delete old consumer offsets for abandoned groups
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
  --delete --group abandoned-consumer-group

# 3. Expand the EBS volume (online, no downtime)
aws ec2 modify-volume --volume-id vol-abc123 --size 1000    # grow to 1 TB
# Then resize the filesystem on the instance:
sudo xfs_growfs /data/kafka       # XFS supports online resize

# 4. Long-term: add brokers and rebalance partitions
# (see "Adding and Removing Brokers" above)

# 5. Long-term: enable tiered storage (Kafka 3.6+)
# Offload cold log segments to S3 while keeping hot data on local disk
# server.properties:
#   remote.log.storage.system.enable=true
#   remote.log.storage.manager.class.name=...  # S3 plugin

Rolling Restarts (Zero Downtime)

# Safe rolling restart procedure for config changes or upgrades:
# 1. Ensure min.insync.replicas < replication.factor (e.g., 2 of 3)
#    so the cluster can tolerate one broker being down

for BROKER_HOST in broker1 broker2 broker3; do
  echo "=== Restarting $BROKER_HOST ==="

  # Stop the broker
  ssh $BROKER_HOST "docker compose stop kafka"

  # Wait for ISR recovery — all partitions must be fully replicated
  # before stopping the next broker
  echo "Waiting for under-replicated partitions to reach 0..."
  while true; do
    URP=$(kafka-topics.sh --bootstrap-server broker1:9092 --describe \
      --under-replicated-partitions 2>/dev/null | wc -l)
    if [ "$URP" -eq 0 ]; then break; fi
    echo "  Still $URP under-replicated partitions, waiting..."
    sleep 10
  done

  # Start the broker
  ssh $BROKER_HOST "docker compose up -d kafka"

  # Wait for it to rejoin ISR
  sleep 30
  echo "=== $BROKER_HOST restarted ==="
done

# After all restarts, trigger preferred leader election
kafka-leader-election.sh --bootstrap-server broker1:9092 \
  --election-type PREFERRED --all-topic-partitions

Production Readiness Checklist

CategoryCheck
Replicationreplication.factor=3, min.insync.replicas=2 for all important topics. Producers use acks=all
Durabilityunclean.leader.election.enable=false (prevents data loss). auto.create.topics.enable=false
StorageDedicated EBS volumes (gp3+), XFS filesystem, noatime mount. Disk alert at 75%
NetworkingBrokers in private subnet across 3 AZs. Security groups restrict ports. TLS for inter-broker and client traffic
MonitoringPrometheus + Grafana (or CloudWatch). Alerts for: under-replicated partitions, disk usage, consumer lag, controller count
Consumer lagDedicated lag monitor (Burrow, Kafka Lag Exporter). Alert on growing lag, not just absolute value
AuthenticationSASL/SCRAM or mTLS for client auth. ACLs to restrict topic access per service
BackpressureProducer buffer.memory and max.block.ms tuned. Consumer max.poll.records set to sustainable batch size
SchemaSchema Registry enforcing BACKWARD compatibility. All topics use Avro/Protobuf, not raw JSON
UpgradesRolling restart procedure documented and tested. Broker inter.broker.protocol.version set for safe upgrades
Disaster recoveryMirrorMaker 2 or Confluent Replicator for cross-region replication if RPO matters

Data Contracts

Schema Registry stops producers from making breaking structural changes — but structural changes are only one of three ways upstream teams silently break downstream analytics.

The Problem: Three Failure Modes

Failure ModeExampleDetectable By
Field rename user_iduserId Schema Registry (structural — breaks compatibility check)
Field drop Remove amount from payload Schema Registry (structural — breaks compatibility check)
Field repurpose amount changes from dollars to cents; status gains 40 new values no consumer knows about Nothing technical — schema is still valid. Downstream gets silently wrong numbers.

The third failure mode — semantic drift — is the hardest to catch and the one most likely to corrupt production analytics quietly for days before anyone notices. A complete data contract strategy needs four layers.

Layer 1: Schema Registry Enforcement

Use Confluent Schema Registry (or AWS Glue Schema Registry) with FULL compatibility mode. FULL means new schemas must be both backward-compatible (old consumers can read new messages) and forward-compatible (new consumers can read old messages). This catches field drops and renames at producer deploy time, before a single message is published.

Use Avro or Protobuf — not raw JSON. JSON has no enforced schema at the wire level. If you must use JSON payloads (e.g., third-party producers you don't control), add a validation layer at ingestion: parse incoming records against a registered JSON Schema and route non-conforming records to a dead-letter topic rather than letting them flow through.

# Register a schema (Avro)
curl -X POST http://schema-registry:8081/subjects/user-events-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"UserEvent\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":[\"null\",\"double\"],\"default\":null}]}"}'

# Check compatibility of a proposed schema change before registering
curl -X POST http://schema-registry:8081/compatibility/subjects/user-events-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "..."}'
# Response: {"is_compatible": false}  ← producer CI fails here, not in production

Layer 2: Consumer-Driven Contracts

Borrowed from microservices contract testing (Pact). Each downstream consumer declares which fields they depend on and what invariants they expect. Declarations live in a shared repo. The upstream team's CI runs contract validation before any deployment — if a proposed schema change would break a declared consumer contract, the producer's pipeline fails.

# contracts/analytics-platform.yaml
consumer: analytics-platform
depends_on:
  - topic: user-events
    fields:
      - name: user_id
        type: string
        required: true
      - name: event_type
        type: string
        required: true
        allowed_values: [signup, login, purchase, churn]
      - name: amount
        type: number
        constraints: { min: 0 }

  - topic: order-events
    fields:
      - name: order_id
        type: string
        required: true
      - name: total_usd
        type: number
        required: true
        constraints: { min: 0, max: 1000000 }

The upstream CI loads all contract files from the shared repo and runs a validator against the proposed schema and any annotated invariants. If any consumer contract fails, the deployment is blocked. This forces schema negotiation to happen before production — not after.

Layer 3: Statistical Distribution Monitoring

The only defense against semantic drift. Track value distributions for important fields over time: cardinality, null rate, min/max, value frequency histograms. Use Population Stability Index (PSI) or a simple chi-square test to detect when a distribution shifts significantly between time windows. A PSI > 0.2 on a field signals a meaningful change.

If a field called status suddenly goes from 5 distinct values to 50, or amount shifts from averaging $45 to averaging $4500, something has changed semantically — even though the schema is structurally valid.

# Daily distribution check — run as a Spark job or scheduled Python script
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("distribution-monitor").getOrCreate()

# Load yesterday's and today's snapshots from your data lake
yesterday = spark.read.parquet("s3://lake/user-events/date=2024-01-14/")
today     = spark.read.parquet("s3://lake/user-events/date=2024-01-15/")

def field_profile(df, field: str) -> dict:
    agg = df.agg(
        F.count(field).alias("count"),
        F.countDistinct(field).alias("cardinality"),
        F.sum(F.when(F.col(field).isNull(), 1).otherwise(0)).alias("null_count"),
        F.min(field).alias("min_val"),
        F.max(field).alias("max_val"),
    ).collect()[0]
    return agg.asDict()

for field in ["event_type", "amount", "status"]:
    prev = field_profile(yesterday, field)
    curr = field_profile(today, field)

    cardinality_change = abs(curr["cardinality"] - prev["cardinality"]) / max(prev["cardinality"], 1)
    null_rate_prev = prev["null_count"] / max(prev["count"], 1)
    null_rate_curr = curr["null_count"] / max(curr["count"], 1)

    if cardinality_change > 0.5:   # cardinality jumped >50%
        alert(f"WARN: {field} cardinality changed {prev['cardinality']} → {curr['cardinality']}")

    if abs(null_rate_curr - null_rate_prev) > 0.05:  # null rate shifted >5pp
        alert(f"WARN: {field} null rate changed {null_rate_prev:.1%} → {null_rate_curr:.1%}")

# For numeric fields, compute PSI over bucketed distributions
def compute_psi(expected_dist: list[float], actual_dist: list[float]) -> float:
    psi = 0.0
    for e, a in zip(expected_dist, actual_dist):
        e = max(e, 1e-10)
        a = max(a, 1e-10)
        psi += (a - e) * (a / e)
    return psi

# PSI < 0.1: no significant change
# PSI 0.1–0.2: moderate shift, investigate
# PSI > 0.2: major shift — alert and escalate

Layer 4: Organizational Policy

Technology alone cannot enforce data contracts. The organizational layer closes the gaps:

PolicyImplementation
Deprecation window Fields must be marked deprecated (and announced) at least 4 weeks before removal. Producers keep emitting the field during the window.
Breaking change channel A dedicated Slack channel (e.g., #data-schema-changes) where all breaking or potentially breaking changes are announced with a migration guide.
Topic ownership Every topic has a declared owner in the schema registry or a central catalog. Ownership is enforced — no topic without an owner merges to production.
Dependency registry Every consumer team registers which topics and fields they depend on. This powers the consumer-driven contract tests (Layer 2) and gives producers a blast radius estimate before any change.
Data lineage tooling Tools like DataHub or OpenMetadata visualize which pipelines consume which topics, what transformations they apply, and which dashboards or models are downstream. Makes the blast radius of a schema change visible before it happens.
Data Contracts Are an Organizational Problem First
Data contracts are as much an organizational problem as a technical one. Schema Registry prevents structural breakage, but semantic drift, undocumented repurposing, and surprise deprecations happen because teams don't have a shared process for schema changes — not because they lack tooling. Without cross-team buy-in on a deprecation policy and a consumer contract process, even the best technical stack won't prevent silent downstream breakage.

12. Configuration Reference

Broker Configs

ConfigDefaultDescription
broker.id-1 (auto)Unique broker identifier
listenersPLAINTEXT://:9092Listener addresses (host:port per protocol)
log.dirs/tmp/kafka-logsDirectories for log data (use multiple for striping)
num.partitions1Default partition count for auto-created topics
default.replication.factor1Default replication factor for auto-created topics
min.insync.replicas1Minimum ISR required for a write to succeed
log.retention.ms604800000 (7d)Time to retain log segments
log.retention.bytes-1 (unlimited)Max bytes per partition before deleting old segments
log.segment.bytes1073741824 (1GB)Max size of a single log segment
log.roll.ms604800000 (7d)Force roll of the active segment after this time
replica.lag.time.max.ms30000Max time a follower can lag before being removed from ISR
num.network.threads3Threads for network I/O
num.io.threads8Threads for disk I/O
socket.send.buffer.bytes102400TCP send buffer size
message.max.bytes1048576 (1MB)Max message size (must align with consumer fetch.max.bytes)
auto.create.topics.enabletrueSet false in production to prevent accidental topic creation
delete.topic.enabletrueAllow topic deletion via admin API
unclean.leader.election.enablefalseAllow out-of-sync replica to become leader (data loss risk)
compression.typeproducerBroker-level compression override ('producer' = use producer's setting)

Producer Configs

ConfigDefaultDescription
acksallRequired acknowledgements (0, 1, all)
retries2147483647Number of retries on transient failures
retry.backoff.ms100Wait between retry attempts
delivery.timeout.ms120000 (2m)Total time budget for a send() call including retries
batch.size16384 (16KB)Max bytes in a batch per partition
linger.ms0Time to wait for batch to fill before sending
buffer.memory33554432 (32MB)Total memory for buffering records before sending
max.block.ms60000 (1m)Time to block send() if buffer is full
compression.typenoneCompression: none, gzip, snappy, lz4, zstd
enable.idempotencetrueIdempotent producer (prevents duplicate on retry)
max.in.flight.requests.per.connection5Max unacknowledged sends (must be 1 for strict order without idempotence)
request.timeout.ms30000Time to wait for broker response before retry
transactional.idnullUnique ID for transactional producer; enables exactly-once
transaction.timeout.ms60000 (1m)Max time for a transaction before broker aborts it

Consumer Configs

ConfigDefaultDescription
group.id""Consumer group name (required for group management)
auto.offset.resetlatestWhat to do when no committed offset: earliest, latest, none
enable.auto.committrueAuto-commit offsets periodically
auto.commit.interval.ms5000Frequency of auto-commit
max.poll.records500Max records returned per poll() call
max.poll.interval.ms300000 (5m)Max time between poll() calls before consumer is kicked
session.timeout.ms45000Heartbeat timeout for group membership
heartbeat.interval.ms3000Heartbeat frequency (should be 1/3 of session.timeout.ms)
fetch.min.bytes1Minimum bytes to fetch before returning (batching)
fetch.max.bytes52428800 (50MB)Max bytes per fetch response
fetch.max.wait.ms500Max wait when fetch.min.bytes not met
isolation.levelread_uncommittedTransaction visibility: read_uncommitted or read_committed
partition.assignment.strategyRangeAssignor, CooperativeStickyAssignorPartition assignment algorithms

13. Common Pitfalls

Consumer Lag Growth

Symptom: LAG grows indefinitely

Causes:

  • Processing is too slow for the incoming message rate
  • Too few consumer instances for the partition count
  • A slow downstream system (database, HTTP call) blocks the poll loop
  • GC pauses or OOM causing consumer to fall behind

Fix: Profile processing time, scale out consumers (add instances up to partition count), offload slow work to async threads (carefully — offset management gets complex).

Rebalancing Storms

Symptom: Constant rebalancing, low throughput

Causes:

  • Processing takes longer than max.poll.interval.ms — consumer appears dead
  • Consumer crashes or restarts frequently (bad deployment, OOM)
  • Too many partitions with eager assignment strategy — rebalance takes too long

Fix:

  • Increase max.poll.interval.ms to match realistic processing time
  • Reduce max.poll.records to process smaller batches faster
  • Switch to CooperativeStickyAssignor — minimizes disruption
  • Fix the root cause of slow processing or crashes

Partition Count Mistakes

Too Few Partitions
  • Consumer group limited to < your desired parallelism
  • Cannot scale consumers beyond partition count
  • Hotspot risk if one partition key dominates
Too Many Partitions
  • Each partition requires a file handle, memory for producer batches, and a controller thread slot
  • Leader election time is O(partitions) — cluster recovery takes longer
  • End-to-end latency increases (more partition metadata to propagate)
  • Rule of thumb: < 10,000 partitions per broker; < 200,000 per cluster (ZK mode)

Message Ordering Guarantees

Kafka ordering is per-partition only
  • Within a partition: strict order guaranteed
  • Across partitions: no ordering guarantee
  • If you need global ordering across a topic: use a single partition (sacrifices parallelism)
  • If you need ordering per entity (e.g., per user): use the entity ID as the message key — all messages for that key land on the same partition
  • Retries with max.in.flight.requests.per.connection > 1 and no idempotence can reorder messages — always enable idempotence

Large Messages

Default max message size is 1MB

Large messages cause memory pressure, replication delays, and consumer fetch timeouts. Three configs must align:

  • Broker: message.max.bytes
  • Topic: max.message.bytes
  • Consumer: fetch.max.bytes and max.partition.fetch.bytes

Better solution: Store large payloads (images, blobs) in object storage (S3, GCS) and send a reference URL in the Kafka message. Keep messages small (< 100KB).

Offset Reset Gotchas

auto.offset.reset only applies when no committed offset exists
  • If a consumer group has committed offsets, auto.offset.reset is ignored entirely
  • To replay: explicitly reset offsets using kafka-consumer-groups.sh --reset-offsets
  • Resetting offsets while consumers are running causes offsets to be re-committed immediately — always stop consumers before resetting
  • auto.offset.reset=latest means you miss messages produced before the consumer first started — use earliest for new consumers joining an existing topic

Producer Timeouts and Retries

// Common misconfiguration: delivery.timeout.ms < retries * retry.backoff.ms
// This causes the producer to give up before exhausting retries
// Fix: delivery.timeout.ms should be >> (retries * retry.backoff.ms)

// Example of correct config for high-durability producer:
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000); // 2 min total budget
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);   // per-request timeout
// delivery.timeout >= request.timeout + linger.ms

// TimeoutException vs NotLeaderOrFollowerException:
// TimeoutException = broker didn't respond within request.timeout.ms (retry)
// NotLeaderOrFollowerException = sent to stale leader (retry with metadata refresh)
// Both are retried automatically with enable.idempotence=true

Offset Commit Semantics Pitfalls

// WRONG: Commit before processing — at-most-once
// If process() throws, the offset is already committed — message is lost
while (true) {
    ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
    consumer.commitSync(); // committed before processing!
    for (ConsumerRecord r : records) {
        process(r); // if this throws, the record is skipped forever
    }
}

// WRONG: Auto-commit with slow async processing
// The auto-commit timer fires on the poll() thread, independent of your processing
// If you hand off records to another thread, the offset may be committed before
// those threads finish — causing silent message loss on crash
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

// CORRECT: Manual commit after processing
while (true) {
    ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord r : records) {
        process(r); // process first
    }
    consumer.commitSync(); // then commit
}

Schema Evolution Pitfalls

Breaking Schema Changes
  • Renaming a field is always a breaking change in Avro — to consumers it looks like the old field was deleted and a new field was added
  • Changing a field type (e.g., int to long) is breaking in most formats
  • Adding a required field without a default breaks BACKWARD compatibility — old data doesn't have this field
  • Fix: Always add fields as optional (union with null + default null in Avro), never remove fields used by downstream consumers

Zombie Consumers / Fencing

Zombie: A consumer that lost group membership but still runs

A consumer that experiences a long GC pause may exceed session.timeout.ms, be removed from the group, and have its partitions reassigned. When the paused consumer resumes, it's now a "zombie" — it will try to commit offsets for partitions it no longer owns. Kafka rejects these commits with a CommitFailedException. The correct response is to stop processing and rejoin the group.

The transactional producer uses epoch fencing — when a new instance with the same transactional.id calls initTransactions(), it bumps the epoch, causing the old (zombie) producer's transactions to be rejected with ProducerFencedException.