Apache Kafka Refresher
Distributed event streaming platform — producers, consumers, streams, and operations
Table of Contents
0. Setup & Environment
Get a single-node Kafka cluster running locally so you can produce and consume from the terminal before diving into the internals.
Prerequisites
Docker Desktop is the only hard requirement.
# Install Docker Desktop
brew install --cask docker
# Verify
docker --version
Single-Node Kafka with Docker Compose
Save this as docker-compose.yml in a working directory. This uses KRaft mode — no ZooKeeper needed.
services:
kafka:
image: apache/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker compose up -d
CLI Tools
The Kafka scripts ship inside the container. Use docker exec to run them without installing anything locally.
# Create a topic (3 partitions, replication factor 1 for single-node)
docker exec -it kafka-kafka-1 \
/opt/kafka/bin/kafka-topics.sh --create \
--topic test --partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
# List topics
docker exec -it kafka-kafka-1 \
/opt/kafka/bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092
brew install kcat. It connects to any broker without a JVM on your host.
Produce and Consume (smoke test)
# Terminal 1 — produce a message
echo "hello kafka" | docker exec -i kafka-kafka-1 \
/opt/kafka/bin/kafka-console-producer.sh \
--topic test --bootstrap-server localhost:9092
# Terminal 2 — consume from the beginning
docker exec -it kafka-kafka-1 \
/opt/kafka/bin/kafka-console-consumer.sh \
--topic test --from-beginning --bootstrap-server localhost:9092
You should see hello kafka appear in Terminal 2. The consumer keeps running; send more messages in Terminal 1 and they stream in immediately.
Cleanup
# Stop and remove containers (data volumes persist)
docker compose down
# Stop, remove containers AND wipe all data volumes
docker compose down -v
1. Architecture
Kafka is a distributed, partitioned, replicated commit log. Every message is durably written to disk and replicated across brokers. Consumers read at their own pace — Kafka does not push; consumers pull.
Brokers
A broker is a single Kafka server process. A cluster typically runs 3+ brokers for fault tolerance. Each broker:
- Stores log segments for the partitions it leads or follows
- Serves produce and fetch requests
- Participates in leader election (via ZooKeeper or KRaft)
- Is identified by a numeric
broker.id
Topics and Partitions
A topic is a named category/feed. Topics are split into partitions — ordered, immutable, append-only logs. Each message in a partition gets a monotonically increasing offset.
Replication: Leaders, Followers, and ISR
Each partition has one leader and zero or more followers. All reads and writes go to the leader. Followers replicate from the leader. The In-Sync Replica (ISR) set contains replicas that are caught up with the leader within replica.lag.time.max.ms.
| Concept | Description |
|---|---|
replication.factor | How many copies of each partition exist (typically 3) |
min.insync.replicas | Minimum ISR count required for a write to succeed (typically 2) |
| ISR | Set of replicas fully caught up with the leader |
| OSR | Out-of-sync replicas — lagging behind; removed from ISR |
| Leader epoch | Monotonic counter incremented on each leader election; prevents stale leader writes |
acks=all on the producer only guarantees a write is acknowledged by the ISR. If ISR shrinks below min.insync.replicas, the broker returns NotEnoughReplicas. With RF=3 and min.ISR=2, you can tolerate 1 broker failure.
ZooKeeper vs KRaft Mode
Historically Kafka used ZooKeeper to store cluster metadata, elect controllers, and track consumer offsets (pre-0.9). Since Kafka 3.3 (KIP-833), KRaft mode (Kafka Raft Metadata) is production-ready and ZooKeeper is deprecated.
| Aspect | ZooKeeper Mode | KRaft Mode |
|---|---|---|
| Metadata storage | ZooKeeper ensemble | Internal __cluster_metadata topic |
| Controller | Single active controller elected via ZK | Raft-based quorum of controllers |
| Scalability | ~200k partitions practical limit | Millions of partitions |
| Operational complexity | Two systems to manage | Single system |
| Kafka version | All versions | 2.8+ (preview), 3.3+ (production) |
Log Segments
Each partition is stored as a sequence of log segments on disk. A segment consists of:
- .log — the actual message data
- .index — sparse index mapping offsets to byte positions
- .timeindex — sparse index mapping timestamps to offsets
New messages are appended to the active segment. When the segment reaches log.segment.bytes (default 1GB) or log.roll.ms, it is rolled and a new active segment starts. Old segments are eligible for deletion or compaction based on retention policy.
Consumer Groups
Each consumer group tracks its own offsets independently. Multiple groups can read the same topic simultaneously without interference. A partition is assigned to exactly one consumer within a group at any time.
2. Producers
The producer is responsible for choosing which partition to send a record to, batching records for efficiency, and handling retries and acknowledgement guarantees.
Java Producer API
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class OrderProducer {
private final KafkaProducer<String, String> producer;
private static final String TOPIC = "orders";
public OrderProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Reliability: wait for all ISR to acknowledge
props.put(ProducerConfig.ACKS_CONFIG, "all");
// Idempotence: exactly-once per partition (requires acks=all)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Batching: wait up to 10ms to fill a batch
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// Max batch size in bytes (16KB default, 32KB here)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024);
// Compression: lz4 is fast; gzip gives best ratio
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// Retries (idempotent producer sets this to Integer.MAX_VALUE by default)
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
// In-flight requests: must be 1 for non-idempotent ordered delivery;
// idempotent producer allows up to 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Block if internal queue is full (instead of throwing exception)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60_000);
this.producer = new KafkaProducer<>(props);
}
// Fire-and-forget (async with callback)
public void sendAsync(String orderId, String payload) {
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC, orderId, payload); // key=orderId routes to same partition
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Log, alert, send to DLQ
System.err.println("Send failed for order " + orderId + ": " + exception.getMessage());
} else {
System.out.printf("Sent to %s-P%d @ offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
// Synchronous send — blocks until acknowledged or throws
public RecordMetadata sendSync(String orderId, String payload) throws Exception {
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC, orderId, payload);
Future<RecordMetadata> future = producer.send(record);
return future.get(); // blocks — use only when you must guarantee ordering inline
}
// Send to specific partition (bypass partitioner)
public void sendToPartition(int partition, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC, partition, key, value);
producer.send(record, (meta, ex) -> { /* handle */ });
}
public void flush() { producer.flush(); }
public void close() { producer.close(); }
}
Acknowledgement Modes
| acks value | Behavior | Durability | Latency |
|---|---|---|---|
0 | No ack — fire and forget | Lowest — loss on any failure | Lowest |
1 | Leader acks after writing to local log | Loss if leader fails before replication | Medium |
all / -1 | Leader acks after all ISR replicate | Highest — survives broker failures up to RF-1 | Highest |
Partitioning Strategies
- Key-based (default):
murmur2(key) % numPartitions. Same key always goes to same partition — guarantees ordering per key. - Round-robin: Used when key is null. Distributes evenly but breaks ordering.
- Sticky partitioner (default since 2.4): Batches to the same partition until the batch is full or
linger.msexpires, then switches. Better throughput than pure round-robin. - Custom partitioner: Implement
Partitionerinterface.
// Custom partitioner example
public class RegionPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
if (key == null) return 0;
String region = key.toString().split(":")[0]; // "us-east:order-123"
return switch (region) {
case "us-east" -> 0 % numPartitions;
case "us-west" -> 1 % numPartitions;
case "eu" -> 2 % numPartitions;
default -> Math.abs(key.hashCode()) % numPartitions;
};
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
// Register in producer config:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RegionPartitioner.class.getName());
Idempotent and Transactional Producers
With enable.idempotence=true, the broker assigns each producer a Producer ID (PID) and tracks a per-partition sequence number. Duplicate retries (same PID + sequence) are silently discarded. This gives exactly-once within a single partition and session.
// Transactional producer — exactly-once across multiple partitions/topics
Properties props = new Properties();
// ... base config ...
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-txn-1");
// transactional.id must be unique per producer instance
KafkaProducer<String, String> txnProducer = new KafkaProducer<>(props);
txnProducer.initTransactions(); // blocks until coordinator assigns epoch
try {
txnProducer.beginTransaction();
// Send to multiple topics atomically
txnProducer.send(new ProducerRecord<>("orders", orderId, orderJson));
txnProducer.send(new ProducerRecord<>("order-audit", orderId, auditJson));
// Optionally commit consumer offsets as part of the transaction
// (enables consume-transform-produce exactly-once)
txnProducer.sendOffsetsToTransaction(offsetsMap, consumerGroupMetadata);
txnProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
// Fatal — cannot recover, close producer
txnProducer.close();
} catch (KafkaException e) {
txnProducer.abortTransaction();
// Retry or propagate
}
Python Producer (confluent-kafka)
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer
def delivery_callback(err, msg):
if err:
print(f"Delivery failed for {msg.key()}: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
producer = Producer({
"bootstrap.servers": "broker1:9092,broker2:9092",
"acks": "all",
"enable.idempotence": True,
"linger.ms": 10,
"batch.size": 32768,
"compression.type": "lz4",
"retries": 2147483647,
"max.in.flight.requests.per.connection": 5,
})
# Produce a message
producer.produce(
topic="orders",
key="order-123", # bytes or str
value='{"amount": 99.99}', # bytes or str
on_delivery=delivery_callback,
)
# Must call poll() or flush() to trigger callbacks
producer.poll(0) # non-blocking trigger
producer.flush() # block until all messages delivered
3. Consumers
Java Consumer API — Poll Loop
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class OrderConsumer {
private final KafkaConsumer<String, String> consumer;
private volatile boolean running = true;
public OrderConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
// What to do when no committed offset exists for this group+partition
// "earliest" = from beginning, "latest" = from now (default)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Disable auto-commit for manual control
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Max records returned per poll() call
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Must call poll() within this window or consumer is considered dead
// and triggers rebalance. Must be > your processing time per batch.
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000); // 5 minutes
// Heartbeat thread sends heartbeats independently of poll()
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3_000);
// If no heartbeat within session.timeout.ms, consumer is removed from group
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45_000);
// Partition assignment strategy (see below)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
this.consumer = new KafkaConsumer<>(props);
}
public void run() {
consumer.subscribe(Collections.singletonList("orders"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Called before rebalance: commit offsets for revoked partitions
commitSync(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Called after rebalance: can seek to specific offsets here
System.out.println("Assigned: " + partitions);
}
});
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
// Manual commit per partition after processing
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// Commit offset+1 (next offset to fetch)
consumer.commitSync(Collections.singletonMap(
partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close(); // also triggers rebalance + commits pending offsets
}
}
private void commitSync(Collection<TopicPartition> partitions) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : partitions) {
offsets.put(tp, new OffsetAndMetadata(consumer.position(tp)));
}
consumer.commitSync(offsets);
}
private void processRecord(ConsumerRecord<String, String> record) {
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s%n",
record.topic(), record.partition(), record.offset(), record.key());
// Your processing logic here
}
public void shutdown() { running = false; }
}
Partition Assignment Strategies
| Strategy | Description | Rebalance Type |
|---|---|---|
RangeAssignor | Assigns contiguous partition ranges per topic. Can be uneven with multiple topics. | Eager (stop-the-world) |
RoundRobinAssignor | Distributes partitions round-robin across consumers. Balanced but ignores current assignment. | Eager (stop-the-world) |
StickyAssignor | Like round-robin but tries to keep existing assignments. Minimizes partition movements. | Eager (stop-the-world) |
CooperativeStickyAssignor | Incremental rebalancing: only revokes partitions that need to move. Recommended. | Cooperative (incremental) |
Offset Management
Offsets are committed to the internal __consumer_offsets topic. Options:
- Auto-commit (
enable.auto.commit=true, default): Commits everyauto.commit.interval.ms(5s). Risk: can commit offsets for unprocessed records (at-most-once) if the consumer crashes before processing. - Manual sync commit (
commitSync()): Blocks until broker confirms. Safest — use after processing each batch. - Manual async commit (
commitAsync()): Non-blocking but no retry on failure. Suitable for high-throughput if you can tolerate occasional duplicate processing on crash. - Per-partition commit: Commit per-partition immediately after processing that partition's records — allows fine-grained control.
// Async commit with callback for logging
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.warn("Async commit failed for offsets {}: {}", offsets, exception.getMessage());
// Note: do NOT retry here — commitAsync will retry via the next commitAsync/Sync
}
});
// Seeking to a specific offset (e.g., replay from beginning)
consumer.seek(new TopicPartition("orders", 0), 0L);
// Seek to beginning of all assigned partitions
consumer.seekToBeginning(consumer.assignment());
// Seek to end (skip old messages)
consumer.seekToEnd(consumer.assignment());
Python Consumer (confluent-kafka)
from confluent_kafka import Consumer, KafkaError, KafkaException
import signal
consumer = Consumer({
"bootstrap.servers": "broker1:9092",
"group.id": "order-processing-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 45000,
"heartbeat.interval.ms": 3000,
})
running = True
def shutdown(signum, frame):
global running
running = False
signal.signal(signal.SIGINT, shutdown)
consumer.subscribe(["orders"])
try:
while running:
msg = consumer.poll(timeout=1.0) # returns one message or None
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError.PARTITION_EOF:
print(f"Reached end of partition {msg.partition()}")
else:
raise KafkaException(msg.error())
continue
# Process message
print(f"Received: key={msg.key()} value={msg.value()} "
f"partition={msg.partition()} offset={msg.offset()}")
# Manual commit — store=True stages, commit() flushes
consumer.store_offsets(msg) # stage this message's offset
consumer.commit() # final commit on shutdown
finally:
consumer.close()
4. Message Delivery Semantics
| Semantic | Producer Config | Consumer Config | Risk |
|---|---|---|---|
| At-most-once | acks=0 or acks=1, no retries |
Auto-commit before processing | Message loss on failure |
| At-least-once | acks=all, retries enabled |
Commit after processing | Duplicate processing on retry/crash |
| Exactly-once | enable.idempotence=true + transactional.id |
isolation.level=read_committed, offsets in transaction |
Higher latency, more complex |
Exactly-Once Semantics (EOS)
True EOS requires coordination at both the producer and consumer sides:
- Idempotent producer: Deduplicates retries within a session and partition using PID + sequence number
- Transactional producer: Writes to multiple partitions/topics atomically — all visible or none
- Transactional consumer:
isolation.level=read_committed— only reads messages from committed transactions - Atomic offset commits:
sendOffsetsToTransaction()commits consumer offsets as part of the produce transaction
Isolation Levels
| Isolation Level | Behavior | Use Case |
|---|---|---|
read_uncommitted (default) |
Reads all messages including those from open or aborted transactions | High throughput, don't care about transactional boundaries |
read_committed |
Only reads messages from committed transactions; waits for open transactions | EOS consumers; financial/audit pipelines |
// Consume-transform-produce exactly-once pattern
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// isolation.level=read_committed set in consumerProps
KafkaProducer<String, String> txnProducer = new KafkaProducer<>(producerProps);
// transactional.id and enable.idempotence set in producerProps
txnProducer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
txnProducer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
String transformed = transform(record.value());
txnProducer.send(new ProducerRecord<>("output-topic", record.key(), transformed));
}
// Commit consumer offsets INSIDE the transaction
txnProducer.sendOffsetsToTransaction(
currentOffsets(records), // Map<TopicPartition, OffsetAndMetadata>
consumer.groupMetadata() // ConsumerGroupMetadata (Kafka 2.5+)
);
txnProducer.commitTransaction();
} catch (Exception e) {
txnProducer.abortTransaction();
}
}
5. Topics & Partitions
Creating Topics
# Create topic with 6 partitions, replication factor 3
kafka-topics.sh --bootstrap-server broker1:9092 \
--create \
--topic orders \
--partitions 6 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=604800000 # 7 days
# Describe topic (shows partition-leader-replica mapping)
kafka-topics.sh --bootstrap-server broker1:9092 --describe --topic orders
# Increase partition count (can only increase, never decrease)
kafka-topics.sh --bootstrap-server broker1:9092 \
--alter --topic orders --partitions 12
# Delete topic (requires delete.topic.enable=true on broker)
kafka-topics.sh --bootstrap-server broker1:9092 --delete --topic orders
Partition Count Guidelines
- Start with: max(target throughput / producer throughput per partition, target throughput / consumer throughput per partition)
- Rule of thumb: 10–50 partitions per topic for most use cases. Avoid thousands.
- More partitions = more parallelism but also more file handles, more rebalance time, more end-to-end latency on leader failover.
- Key ordering: If you need strict ordering per key, remember that adding partitions changes the key-to-partition mapping for new messages — existing and new messages for the same key may land on different partitions.
Retention Policies
| Policy | Config | Description |
|---|---|---|
| Time-based deletion | retention.ms (default 7 days) | Delete segments older than the retention period |
| Size-based deletion | retention.bytes (default -1, unlimited) | Delete oldest segments when total size exceeds limit |
| Log compaction | cleanup.policy=compact | Retain only the latest value per key; null-value tombstones |
| Compaction + deletion | cleanup.policy=compact,delete | Compact and also apply time/size-based deletion |
Log Compaction
Log compaction ensures that Kafka retains at least the last known value for each message key. It is ideal for changelog or event-sourced use cases (e.g., database changelogs, Kafka Streams state store changelogs).
- The compaction process runs in the background; the log is never fully compacted in real-time
- A tombstone is a record with a non-null key and a null value — signals that the key should be deleted
- Tombstones are retained for
delete.retention.ms(default 24h) before being removed - The head of the log (active segment) is never compacted; only the tail is
# Configure a topic for log compaction
kafka-configs.sh --bootstrap-server broker1:9092 \
--entity-type topics --entity-name user-profiles \
--alter \
--add-config cleanup.policy=compact,min.cleanable.dirty.ratio=0.5,delete.retention.ms=86400000
# Send a tombstone (delete key "user-123" from compacted log)
# Value must be null
echo '{"key": "user-123", "value": null}'
Unclean Leader Election
When all ISR replicas are unavailable, Kafka must decide: wait for an ISR member to come back, or elect an out-of-sync replica immediately. This is controlled by unclean.leader.election.enable.
| Setting | Behavior | Trade-off |
|---|---|---|
false (default) | Wait for ISR replica — may be unavailable | No data loss; partition unavailable until ISR returns |
true | Elect any replica immediately | Partition available immediately; may lose messages not yet replicated |
6. Kafka Streams
Kafka Streams is a client library (not a separate cluster) for building stateful stream processing applications. It runs inside your JVM process alongside your application code.
KStream vs KTable
| Abstraction | Semantics | Example |
|---|---|---|
KStream | Unbounded stream of records; each record is an independent event | Click events, page views, transactions |
KTable | Changelog stream; each record is an upsert — the latest value per key wins | User profiles, account balances, inventory |
GlobalKTable | Fully replicated KTable — entire table is available on every instance. No co-partitioning required for joins. | Reference data, country codes, small lookup tables |
Stream Topology
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
import java.util.Properties;
public class OrderProcessingTopology {
public static KafkaStreams build() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Exactly-once semantics within Streams
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// State store location
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
StreamsBuilder builder = new StreamsBuilder();
// Source: read from "orders" topic
KStream<String, String> ordersStream = builder.stream("orders");
// ---- Stateless transformations ----
// Filter: only process non-null values
KStream<String, String> validOrders = ordersStream
.filter((key, value) -> value != null && !value.isEmpty());
// Map: transform key and value
KStream<String, String> enriched = validOrders
.mapValues(value -> enrich(value)); // transform value only
// FlatMap: one record -> multiple records
KStream<String, String> items = ordersStream
.flatMapValues(order -> parseItems(order)); // each order -> list of items
// Branch (split into multiple streams by predicate)
Map<String, KStream<String, String>> branches = validOrders.split(Named.as("branch-"))
.branch((k, v) -> v.contains("\"priority\":\"high\""), Branched.as("high"))
.branch((k, v) -> v.contains("\"priority\":\"low\""), Branched.as("low"))
.defaultBranch(Branched.as("normal"));
KStream<String, String> highPriority = branches.get("branch-high");
KStream<String, String> lowPriority = branches.get("branch-low");
// Sink: write to output topic
enriched.to("enriched-orders");
highPriority.to("high-priority-orders");
return new KafkaStreams(builder.build(), props);
}
private static String enrich(String value) { return value; /* placeholder */ }
private static java.util.List<String> parseItems(String order) { return java.util.List.of(); }
}
Stateful Operations
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> events = builder.stream("order-events");
// GroupByKey — prerequisite for aggregations
KGroupedStream<String, String> grouped = events.groupByKey();
// Count per key, stored in a state store backed by RocksDB
KTable<String, Long> orderCounts = grouped
.count(Materialized.as("order-count-store"));
// Aggregate: sum order amounts per customer
KTable<String, Double> totalSpend = grouped
.aggregate(
() -> 0.0, // initializer
(key, value, aggregate) -> aggregate + parseAmount(value), // aggregator
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
.withValueSerde(Serdes.Double())
);
// Reduce: keep running max order amount per customer
KTable<String, String> maxOrder = grouped
.reduce((agg, newVal) -> largerAmount(agg, newVal));
// Expose state store for interactive queries (local queries only)
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
ReadOnlyKeyValueStore<String, Long> store =
streams.store(StoreQueryParameters.fromNameAndType(
"order-count-store", QueryableStoreTypes.keyValueStore()));
Long count = store.get("customer-123");
Windowing
| Window Type | Description | Example |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping windows | Revenue per 1-hour window |
| Hopping | Fixed-size, overlapping windows (advance < size) | 5-min revenue every 1 min |
| Sliding | Windows defined by time difference between records | Records within 30s of each other |
| Session | Activity-based; gap of inactivity closes the window | User session grouping with 30-min idle gap |
// Tumbling window: order count per 1-hour window
KTable<Windowed<String>, Long> hourlyOrderCounts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.as("hourly-order-counts"));
// Hopping window: 5-minute windows advancing every 1 minute
KTable<Windowed<String>, Long> rollingCounts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1)))
.count();
// Session window: group events within 30-minute inactivity gap
KTable<Windowed<String>, Long> sessionCounts = events
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count(Materialized.as("session-counts"));
// Access windowed results
hourlyOrderCounts.toStream()
.map((windowedKey, count) -> {
String key = windowedKey.key();
long windowStart = windowedKey.window().start();
long windowEnd = windowedKey.window().end();
return new KeyValue<>(key, String.format("%d orders [%d-%d]", count, windowStart, windowEnd));
})
.to("hourly-summary");
Stream Joins
| Join Type | Left | Right | Requirement | Output |
|---|---|---|---|---|
| KStream-KStream | KStream | KStream | Co-partitioned, same key, within join window | KStream |
| KStream-KTable | KStream | KTable | Co-partitioned, same key | KStream |
| KStream-GlobalKTable | KStream | GlobalKTable | No co-partitioning required | KStream |
| KTable-KTable | KTable | KTable | Co-partitioned | KTable |
// KStream-KTable join: enrich order events with customer data
KStream<String, String> orders = builder.stream("orders");
KTable<String, String> customers = builder.table("customers");
KStream<String, String> enrichedOrders = orders.join(
customers,
(orderJson, customerJson) -> merge(orderJson, customerJson), // value joiner
Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
// KStream-KStream join: match orders with payments within 5 minutes
KStream<String, String> payments = builder.stream("payments");
KStream<String, String> matched = orders.join(
payments,
(order, payment) -> combine(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
// GlobalKTable join (no co-partitioning needed)
GlobalKTable<String, String> regions = builder.globalTable("regions");
KStream<String, String> located = orders.join(
regions,
(orderKey, orderValue) -> extractRegionKey(orderValue), // key extractor for lookup
(orderValue, regionValue) -> addRegion(orderValue, regionValue)
);
7. Kafka Connect
Kafka Connect is a framework for reliably streaming data between Kafka and other systems without writing producer/consumer code. Connectors are reusable, configurable plugins.
Architecture & Key Abstractions
Class Hierarchy
// The two top-level contracts:
//
// SourceConnector → creates SourceTasks → produce records INTO Kafka
// SinkConnector → creates SinkTasks → consume records FROM Kafka
//
// Key interfaces:
//
// Connector (abstract)
// ├── version() → plugin version string
// ├── start(Map props) → receive config, validate, prepare
// ├── taskClass() → return the Task class this connector uses
// ├── taskConfigs(int maxTasks) → split work into task-level configs
// ├── stop() → cleanup
// └── config() → define accepted configuration properties
//
// Task (abstract)
// ├── version()
// ├── start(Map props) → receive task-level config from Connector
// └── stop()
//
// SourceTask extends Task
// └── poll() → return List<SourceRecord> (called in a loop)
// each SourceRecord = (topic, key, value, partition, offset)
// Connect handles offset commit automatically
//
// SinkTask extends Task
// ├── put(Collection<SinkRecord>) → receive batch of records from Kafka
// ├── flush(Map offsets) → commit external state, then Connect commits Kafka offsets
// └── open(Collection partitions) → called on partition assignment
// close(Collection partitions) → called on partition revocation
How Work Gets Split: Connector → Tasks
The Connector's taskConfigs(int maxTasks) method is where you decide how to parallelize. You return a List<Map<String,String>> — one config map per task. Each task gets exactly one of these maps and works independently.
Example patterns:
- JDBC Source: Each task handles a subset of tables
- File Source: Each task handles a subset of files/directories
- Spreadsheet Source: Master config lists N spreadsheets → split across min(N, maxTasks) tasks, each task handles its assigned spreadsheets
- S3 Sink: Each task handles a subset of Kafka partitions (handled automatically by Connect for sink connectors)
// Example: how a source connector splits work across tasks
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// Say we have 10 spreadsheets to monitor and maxTasks = 4
// Split spreadsheets across tasks: [3, 3, 2, 2]
List<List<String>> groups = partitionList(allSpreadsheetIds, maxTasks);
List<Map<String, String>> configs = new ArrayList<>();
for (List<String> group : groups) {
Map<String, String> taskConfig = new HashMap<>(this.baseConfig);
taskConfig.put("assigned.spreadsheets", String.join(",", group));
configs.add(taskConfig);
}
return configs; // Connect creates one task per config map
}
// For SINK connectors, you generally don't need to split work yourself —
// Connect automatically assigns Kafka partitions across tasks based on tasks.max
Offset Management
// SOURCE connectors: Connect tracks offsets automatically via SourceRecord
// You provide a "source partition" (which external entity) and "source offset" (position)
// Connect stores these in an internal topic (_connect-offsets)
SourceRecord record = new SourceRecord(
// Source partition — identifies WHICH external entity
Collections.singletonMap("spreadsheet", "sheet-abc-123"),
// Source offset — your cursor/position within that entity
Collections.singletonMap("row", lastProcessedRow),
// Kafka topic, key schema, key, value schema, value
"spreadsheet-events", null, sheetId, valueSchema, rowData
);
// On restart, Connect passes the last committed offset back to your task via
// context.offsetStorageReader().offset(partition) — so you resume where you left off
// SINK connectors: Connect manages Kafka consumer offsets for you
// Your put() receives records, your flush() signals external commit is done,
// then Connect commits the Kafka offset. If flush() throws, offsets don't advance.
Standalone vs Distributed Mode
| Mode | Use Case | Scalability | Fault Tolerance |
|---|---|---|---|
| Standalone | Development, single-node pipelines | Single process | None |
| Distributed | Production — connectors distributed across worker pool | Add workers dynamically | Workers auto-rebalance tasks on failure |
Connector Configuration
// Source connector: JDBC source — stream a Postgres table into Kafka
{
"name": "orders-jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "kafka_user",
"connection.password": "${file:/opt/kafka/secrets.properties:db.password}",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"table.whitelist": "orders",
"topic.prefix": "db-",
"poll.interval.ms": "5000",
"batch.max.rows": "1000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
// Sink connector: write Kafka topic to S3 (Confluent S3 Sink)
{
"name": "orders-s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "orders",
"s3.region": "us-east-1",
"s3.bucket.name": "my-kafka-data",
"s3.part.size": "67108864",
"flush.size": "10000",
"rotate.interval.ms": "600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Debezium CDC Connector
Debezium captures row-level changes from database transaction logs (CDC — Change Data Capture). It reads the binlog/WAL directly, producing one Kafka message per database change.
{
"name": "postgres-cdc-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "mydb",
"database.server.name": "mydb",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"topic.prefix": "cdc",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Debezium Production Caveats (Postgres)
SELECT * of each table. For a 500 GB table this can take hours and flood the Kafka cluster with data. The default Kafka Connect producer settings are tuned for low-latency streaming, not bulk throughput — you must tune them for the snapshot phase.
// Production Debezium config with snapshot tuning
{
"name": "postgres-cdc-prod",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-primary.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/opt/secrets/debezium.properties:db.password}",
"database.dbname": "production",
"topic.prefix": "cdc",
"table.include.list": "public.orders,public.customers,public.events",
"plugin.name": "pgoutput",
"slot.name": "debezium_prod",
"publication.name": "debezium_pub",
"publication.autocreate.mode": "filtered",
// ── Snapshot tuning ─────────────────────────────────────────────
"snapshot.mode": "initial",
// "initial" = snapshot on first run, then stream WAL
// "never" = skip snapshot, stream WAL only (requires slot already at correct position)
// "when_needed" = snapshot if no offset exists or if slot is lost
// Snapshot fetch size: how many rows per SELECT batch
"snapshot.fetch.size": 10000,
// Default is 2048 — increase for large tables to reduce round trips
// ── Kafka producer overrides (critical for snapshot throughput) ──
// These override the Connect worker's default producer settings
// for THIS connector only
"producer.override.batch.size": "524288",
// Default 16KB → 512KB: larger batches = fewer network round trips
"producer.override.linger.ms": "100",
// Default 0 → 100ms: wait to fill batches instead of sending immediately
"producer.override.buffer.memory": "134217728",
// Default 32MB → 128MB: more buffer for bursty snapshot data
"producer.override.compression.type": "lz4",
// Compress snapshot data (often 3-5x compression on row data)
"producer.override.max.request.size": "10485760",
// Default 1MB → 10MB: allow larger requests for wide rows
"producer.override.acks": "1",
// During snapshot: acks=1 is safe (data is in the DB, re-snapshot if lost)
// Switch to acks=all after snapshot completes (or keep acks=all if you
// can't tolerate re-snapshotting on failure)
// ── Heartbeat (keeps the replication slot alive during long snapshots) ──
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO debezium_heartbeat (ts) VALUES (NOW()) ON CONFLICT (id) DO UPDATE SET ts = NOW()",
// ── Signal table (for ad-hoc re-snapshots without restarting) ──
"signal.data.collection": "public.debezium_signal",
// INSERT INTO debezium_signal VALUES ('snap-orders', 'execute-snapshot',
// '{"data-collections": ["public.orders"]}');
// ── WAL disk usage protection ──
"slot.drop.on.stop": "false",
// Keep the slot when connector stops — BUT monitor pg_replication_slots!
// An abandoned slot prevents WAL cleanup → disk fills → Postgres crashes
// ── Converters ──
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"tasks.max": "1"
// Debezium Postgres: ALWAYS 1 task. It reads a single WAL stream.
// Multiple tasks are only useful for MySQL (one task per table during snapshot).
}
}
- Replication slot WAL retention: An idle or lagging Debezium connector causes Postgres to retain WAL files indefinitely. Monitor
pg_replication_slotsand setmax_slot_wal_keep_size(PG 13+) as a safety cap. If WAL fills the disk, Postgres goes read-only. - Use a read replica for snapshots: The initial
SELECT *can hammer the primary. Point Debezium at a replica for the snapshot, then switch to the primary's WAL stream. Debezium 2.x supportssnapshot.mode=initial_only+ a separate streaming connector. - Publication filtering: Use
publication.autocreate.mode=filteredso the publication only includes tables intable.include.list. The default (all_tables) publishes every table's WAL changes, wasting I/O. - TOAST columns: Postgres doesn't include unchanged TOAST (large) columns in WAL. Set
REPLICA IDENTITY FULLon tables with TOAST columns you need, or you'll get nulls for unchanged text/jsonb fields on UPDATE. - Schema changes: DDL changes (ALTER TABLE) cause Debezium to update its internal schema history. If a schema change is incompatible with Schema Registry's compatibility mode, the connector will fail. Test DDL changes in staging first.
- Monitoring: Track
MilliSecondsBehindSource(Debezium metric) — this is the lag between WAL event timestamp and when Debezium processed it. Alert if it grows.
-- Postgres setup for Debezium (run as superuser)
-- 1. Create a dedicated replication user
CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'strong_password';
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
-- 2. Set wal_level = logical (requires restart)
ALTER SYSTEM SET wal_level = 'logical';
-- pg_ctl restart
-- 3. Safety cap on WAL retention from abandoned slots (PG 13+)
ALTER SYSTEM SET max_slot_wal_keep_size = '50GB';
-- 4. Set REPLICA IDENTITY FULL on tables with TOAST columns you care about
ALTER TABLE orders REPLICA IDENTITY FULL;
-- Without this, UPDATE on a TOAST column you didn't change shows as null
-- 5. Create heartbeat table (prevents slot from going stale during idle periods)
CREATE TABLE debezium_heartbeat (id INT PRIMARY KEY DEFAULT 1, ts TIMESTAMPTZ);
INSERT INTO debezium_heartbeat VALUES (1, NOW());
GRANT ALL ON debezium_heartbeat TO debezium;
-- 6. Create signal table (for ad-hoc re-snapshots)
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32), data VARCHAR(2048));
GRANT ALL ON debezium_signal TO debezium;
-- 7. Monitor replication slot health
SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;
-- lag should be small (< 1 GB). If it's growing, Debezium is lagging or dead.
MirrorMaker 2
MirrorMaker 2 (MM2) replicates topics, consumer group offsets, and ACLs between Kafka clusters. It's built on the Kafka Connect framework — each replication flow runs as a set of Connect source connectors.
- Cross-team data sharing: Team A owns a Kafka cluster with their event streams. Team B needs a subset of those topics for their own processing. MM2 mirrors the relevant topics into Team B's cluster — no direct cross-cluster consumer connections, no blast radius if Team B's consumers misbehave.
- Disaster recovery (active-passive): Mirror the primary cluster to a standby in another region. If the primary fails, consumers switch to the standby. RPO depends on replication lag.
- Active-active (multi-region): Each region produces locally and mirrors to other regions. Requires careful topic naming and consumer offset translation to avoid infinite loops.
- Cloud migration: Mirror on-prem Kafka to a cloud cluster during migration. Consumers switch over gradually. Decommission on-prem when done.
- Data ingestion from partner teams: Instead of granting direct access to your cluster, the other team runs their cluster independently and you pull their topics via MM2 into your own cluster for processing.
Architecture
# MM2 Components (all are Kafka Connect source connectors under the hood):
#
# MirrorSourceConnector — replicates topic data (records) from source → target
# MirrorCheckpointConnector — replicates consumer group offsets (translated)
# MirrorHeartbeatConnector — emits heartbeats for monitoring replication health
#
# Topic naming convention:
# Source topic "orders" on cluster "us-east" becomes "us-east.orders" on target
# This prefix prevents infinite loops in bidirectional replication
#
# Consumer offset translation:
# MM2 translates source offsets to target offsets so consumers can fail over
# Stored in the internal topic: <source-alias>.checkpoints.internal
Dedicated MM2 Cluster (Recommended)
# mm2.properties — dedicated MirrorMaker 2 configuration
# Run as: connect-mirror-maker.sh mm2.properties
# ── Cluster aliases ──
clusters = source, target
source.bootstrap.servers = source-broker1:9092,source-broker2:9092
target.bootstrap.servers = target-broker1:9092,target-broker2:9092
# ── Replication flow: source → target ──
source->target.enabled = true
source->target.topics = team-a\.events\..*,team-a\.orders
# Regex pattern — mirror all topics matching "team-a.events.*" and "team-a.orders"
# Use explicit topic list or regex. Avoid .* (mirrors internal topics too)
# Prevent reverse replication (unless active-active)
target->source.enabled = false
# ── Topic configuration ──
# Replicated topics get prefixed: "team-a.events.clicks" → "source.team-a.events.clicks"
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# To remove the prefix (flat mirroring), use IdentityReplicationPolicy
# WARNING: IdentityReplicationPolicy can cause infinite loops in bidirectional setups
# Preserve partition count on target
source->target.sync.topic.configs.enabled = true
source->target.sync.topic.acls.enabled = true
# ── Consumer group offset sync ──
source->target.emit.checkpoints.enabled = true
source->target.emit.checkpoints.interval.seconds = 30
source->target.sync.group.offsets.enabled = true
# Groups to sync (regex):
source->target.groups = team-b-consumer-.*
# ── Heartbeats for monitoring ──
source->target.emit.heartbeats.enabled = true
source->target.emit.heartbeats.interval.seconds = 10
# ── Replication tuning ──
source->target.tasks.max = 8
# More tasks = more parallelism. Each task handles a subset of partitions.
# Rule of thumb: 1 task per 20-50 partitions
# Producer overrides for the target cluster (throughput tuning)
target.producer.batch.size = 524288
target.producer.linger.ms = 50
target.producer.compression.type = lz4
target.producer.buffer.memory = 67108864
# Consumer overrides for the source cluster
source.consumer.auto.offset.reset = earliest
source.consumer.fetch.max.bytes = 52428800
MM2 as Connect Connectors (Alternative)
// Deploy MM2 connectors on an existing Connect cluster
// Useful if you already have a Connect cluster and don't want to run a separate MM2 process
// 1. MirrorSourceConnector — replicates data
{
"name": "mm2-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "us-east",
"target.cluster.alias": "us-west",
"source.cluster.bootstrap.servers": "us-east-broker1:9092",
"target.cluster.bootstrap.servers": "us-west-broker1:9092",
"topics": "team-a\\.events\\..*",
"tasks.max": "4",
"replication.factor": 3,
"sync.topic.configs.enabled": true,
"producer.override.compression.type": "lz4",
"producer.override.batch.size": "524288",
"producer.override.linger.ms": "50"
}
}
// 2. MirrorCheckpointConnector — syncs consumer offsets
{
"name": "mm2-checkpoint-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"source.cluster.alias": "us-east",
"target.cluster.alias": "us-west",
"source.cluster.bootstrap.servers": "us-east-broker1:9092",
"target.cluster.bootstrap.servers": "us-west-broker1:9092",
"groups": ".*",
"emit.checkpoints.interval.seconds": "30",
"sync.group.offsets.enabled": true,
"tasks.max": "1"
}
}
Monitoring MM2
# 1. Check replication lag — compare source topic end offsets with mirror consumer offsets
kafka-consumer-groups.sh --bootstrap-server source-broker1:9092 \
--describe --group mm2-source-connector-0
# LAG column shows how far behind MM2 is on the source cluster
# 2. Heartbeat topic — check on the target cluster
kafka-console-consumer.sh --bootstrap-server target-broker1:9092 \
--topic us-east.heartbeats --from-beginning --max-messages 3
# Each heartbeat has a timestamp. If heartbeats stop, replication is broken.
# 3. Checkpoint topic — verify offset translation is working
kafka-console-consumer.sh --bootstrap-server target-broker1:9092 \
--topic us-east.checkpoints.internal --from-beginning --max-messages 5
# 4. Compare topic record counts (quick sanity check)
# Source:
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list source-broker1:9092 --topic team-a.events.clicks --time -1
# Target (note the prefix):
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list target-broker1:9092 --topic us-east.team-a.events.clicks --time -1
# Key alerts:
# - MM2 consumer lag > threshold (replication falling behind)
# - Heartbeat gap > 60s (replication may be broken)
# - Target topic partition count != source (sync.topic.configs not working)
- Topic regex is evaluated once on start. If Team A creates a new topic matching your regex, MM2 won't pick it up until you restart the connector (or set
refresh.topics.interval.secondsfor periodic rediscovery). - Exactly-once is not guaranteed. MM2 provides at-least-once. On failover, consumers may see duplicates from the overlap period. Design consumers to be idempotent.
- Bandwidth planning: MM2 doubles your network egress from the source cluster. If the source produces 500 MB/s, MM2 adds 500 MB/s of consumer traffic. Coordinate with the source cluster owner.
- Don't mirror internal topics. Exclude
__consumer_offsets,_schemas,_connect-*etc. Use explicit topic lists or careful regex, nottopics = .*. - Consumer failover is manual. Offset translation is available in the checkpoint topic, but consumers must be reconfigured to read from the target cluster and the prefixed topic names. Automate this in your runbook.
Single Message Transforms (SMTs)
SMTs are lightweight, stateless transformations applied to each record as it passes through Connect.
{
"transforms": "extractKey,addTimestamp,maskPII",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "kafka_ingest_time",
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "email,ssn",
"transforms.maskPII.replacement": "***"
}
Dead Letter Queue
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "orders-dlq",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
Building a Custom Connector (Java)
Example: Google Sheets Source Connector
A source connector that monitors a master spreadsheet listing other spreadsheets to ingest, polls them at a configured interval, and splits work across tasks — each task handles a subset of spreadsheets.
// ── SheetsSourceConnector.java ──────────────────────────────────────
// The Connector: validates config, discovers spreadsheets, splits work into tasks
public class SheetsSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(SheetsSourceConnector.class);
// Config keys
public static final String MASTER_SHEET_ID = "master.spreadsheet.id";
public static final String POLL_INTERVAL_MS = "poll.interval.ms";
public static final String TOPIC = "topic";
public static final String SERVICE_ACCOUNT_JSON = "gcp.service.account.json";
private String masterSheetId;
private String topic;
private long pollIntervalMs;
private String serviceAccountJson;
private List<String> spreadsheetIds;
@Override
public String version() { return "1.0.0"; }
@Override
public void start(Map<String, String> props) {
// Called once when the connector is created/started
masterSheetId = props.get(MASTER_SHEET_ID);
topic = props.get(TOPIC);
pollIntervalMs = Long.parseLong(props.getOrDefault(POLL_INTERVAL_MS, "60000"));
serviceAccountJson = props.get(SERVICE_ACCOUNT_JSON);
// Read the master spreadsheet to discover which sheets to ingest
SheetsClient client = new SheetsClient(serviceAccountJson);
spreadsheetIds = client.listTargetSheets(masterSheetId);
log.info("Discovered {} spreadsheets from master config", spreadsheetIds.size());
}
@Override
public Class<? extends Task> taskClass() {
return SheetsSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
// Split spreadsheets across tasks — each task gets an independent subset
int numTasks = Math.min(maxTasks, spreadsheetIds.size());
List<Map<String, String>> configs = new ArrayList<>();
// Round-robin assignment
List<List<String>> assignments = new ArrayList<>();
for (int i = 0; i < numTasks; i++) assignments.add(new ArrayList<>());
for (int i = 0; i < spreadsheetIds.size(); i++) {
assignments.get(i % numTasks).add(spreadsheetIds.get(i));
}
for (List<String> assigned : assignments) {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put("assigned.spreadsheets", String.join(",", assigned));
taskConfig.put(TOPIC, topic);
taskConfig.put(POLL_INTERVAL_MS, String.valueOf(pollIntervalMs));
taskConfig.put(SERVICE_ACCOUNT_JSON, serviceAccountJson);
configs.add(taskConfig);
}
return configs;
}
@Override
public void stop() {
// Cleanup resources (HTTP clients, etc.)
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(MASTER_SHEET_ID, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"Google Sheets ID of the master configuration spreadsheet")
.define(TOPIC, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"Kafka topic to produce records to")
.define(POLL_INTERVAL_MS, ConfigDef.Type.LONG, 60000L, ConfigDef.Importance.MEDIUM,
"How often to poll spreadsheets for new data (ms)")
.define(SERVICE_ACCOUNT_JSON, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH,
"GCP service account JSON key");
}
}
// ── SheetsSourceTask.java ───────────────────────────────────────────
// The Task: polls assigned spreadsheets, emits SourceRecords
public class SheetsSourceTask extends SourceTask {
private String topic;
private long pollIntervalMs;
private List<String> assignedSheets;
private SheetsClient client;
// Track last-read row per spreadsheet (our "offset")
private Map<String, Long> lastRow = new HashMap<>();
@Override
public String version() { return "1.0.0"; }
@Override
public void start(Map<String, String> props) {
topic = props.get("topic");
pollIntervalMs = Long.parseLong(props.get("poll.interval.ms"));
assignedSheets = Arrays.asList(props.get("assigned.spreadsheets").split(","));
client = new SheetsClient(props.get("gcp.service.account.json"));
// Restore offsets from Connect's offset storage (survives restarts)
for (String sheetId : assignedSheets) {
Map<String, Object> partition = Collections.singletonMap("spreadsheet", sheetId);
Map<String, Object> offset = context.offsetStorageReader().offset(partition);
if (offset != null && offset.containsKey("row")) {
lastRow.put(sheetId, (Long) offset.get("row"));
} else {
lastRow.put(sheetId, 0L); // start from beginning
}
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Called in a loop by the Connect worker — return records or block
List<SourceRecord> records = new ArrayList<>();
for (String sheetId : assignedSheets) {
long startRow = lastRow.get(sheetId);
List<Map<String, Object>> rows = client.readRowsAfter(sheetId, startRow);
for (Map<String, Object> row : rows) {
long rowNum = (Long) row.get("_row_number");
// Source partition: identifies which spreadsheet
Map<String, String> sourcePartition =
Collections.singletonMap("spreadsheet", sheetId);
// Source offset: row number — our cursor for resuming
Map<String, Long> sourceOffset =
Collections.singletonMap("row", rowNum);
Schema valueSchema = buildSchema(row); // dynamic schema from columns
Struct value = buildStruct(valueSchema, row);
records.add(new SourceRecord(
sourcePartition, sourceOffset,
topic, null, sheetId, // key = spreadsheet ID
valueSchema, value
));
lastRow.put(sheetId, rowNum);
}
}
if (records.isEmpty()) {
Thread.sleep(pollIntervalMs); // backoff when no new data
}
return records;
// Connect commits offsets automatically after records are acked by Kafka
}
@Override
public void stop() {
client.close();
}
}
Example: PII Masking Sink Connector
A sink connector (or SMT) that masks sensitive fields before writing to an external system. Two approaches:
// ── Approach 1: Custom SMT (recommended for simple field masking) ───
// SMTs are lightweight, reusable, and work with ANY connector
public class PiiMaskTransform<R extends ConnectRecord<R>>
implements Transformation<R> {
private List<String> fieldsToMask;
private String replacement;
@Override
public void configure(Map<String, ?> configs) {
fieldsToMask = Arrays.asList(
((String) configs.get("fields")).split(","));
replacement = (String) configs.getOrDefault("replacement", "***REDACTED***");
}
@Override
public R apply(R record) {
if (record.value() == null) return record;
Schema schema = record.valueSchema();
Struct original = (Struct) record.value();
Struct masked = new Struct(schema);
for (Field field : schema.fields()) {
Object value = original.get(field);
if (fieldsToMask.contains(field.name()) && value instanceof String) {
// Mask: keep first/last char for debugging, mask the rest
String str = (String) value;
if (str.length() > 4) {
masked.put(field, str.charAt(0)
+ replacement
+ str.charAt(str.length() - 1));
} else {
masked.put(field, replacement);
}
} else {
masked.put(field, value);
}
}
return record.newRecord(
record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(),
schema, masked,
record.timestamp()
);
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("fields", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"Comma-separated list of fields to mask")
.define("replacement", ConfigDef.Type.STRING, "***REDACTED***",
ConfigDef.Importance.LOW, "Replacement string");
}
@Override
public void close() {}
}
// Deploy the PII mask SMT with any sink connector:
{
"name": "s3-sink-with-pii-masking",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "customer-events",
"tasks.max": "4",
"transforms": "maskPii",
"transforms.maskPii.type": "com.example.PiiMaskTransform",
"transforms.maskPii.fields": "email,ssn,phone,credit_card",
"transforms.maskPii.replacement": "***",
"s3.bucket.name": "analytics-data-lake",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat"
}
}
// ── Approach 2: Full SinkConnector with built-in masking ────────────
// Use when masking logic is complex (regex patterns, format-preserving
// encryption, field-level policies from a config service) and you need
// full control over the sink write path.
public class MaskingSinkTask extends SinkTask {
private MaskingPolicy policy; // loaded from config service
private ExternalWriter writer; // writes to target system
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
Struct value = (Struct) record.value();
Struct masked = policy.apply(value); // per-field masking rules
writer.write(masked);
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
writer.flush(); // commit to external system
// After flush() returns, Connect commits Kafka offsets
// If flush() throws, offsets are NOT committed → records will be re-delivered
}
}
Packaging & Deployment
Project Structure
# Standard Maven project layout for a Connect plugin
my-sheets-connector/
├── pom.xml
├── src/main/java/com/example/connect/
│ ├── SheetsSourceConnector.java
│ ├── SheetsSourceTask.java
│ ├── SheetsClient.java # Google Sheets API wrapper
│ └── PiiMaskTransform.java # bonus: package SMTs in the same plugin
├── src/main/resources/
│ └── META-INF/services/ # NOT needed — Connect uses plugin.path scanning
└── src/test/java/com/example/connect/
├── SheetsSourceConnectorTest.java
└── SheetsSourceTaskTest.java
Maven pom.xml
<project>
<groupId>com.example</groupId>
<artifactId>sheets-source-connector</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<dependencies>
<!-- Kafka Connect API — PROVIDED, already on the worker classpath -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.6.0</version>
<scope>provided</scope>
</dependency>
<!-- Your connector's actual dependencies -->
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-sheets</artifactId>
<version>v4-rev20231218-2.0.0</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Shade plugin: creates uber-jar with all deps (excluding Kafka Connect API) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Build, Package, Deploy
# 1. Build the connector
mvn clean package -DskipTests
# produces: target/sheets-source-connector-1.0.0.jar
# 2. Deploy to Connect workers — two approaches:
# ── Option A: Copy JAR to plugin.path directory ──
# Each connector lives in its own subdirectory under plugin.path
# Connect discovers it automatically on worker restart
scp target/sheets-source-connector-1.0.0.jar \
connect-host:/usr/share/confluent-hub-components/sheets-source-connector/
# Restart Connect worker to pick up new plugin
ssh connect-host "docker compose restart kafka-connect"
# ── Option B: Bake into Docker image (recommended for production) ──
# Dockerfile for a custom Connect image with your plugin pre-installed
# Dockerfile.connect
FROM confluentinc/cp-kafka-connect:7.6.0
# Install community connectors via confluent-hub
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.4.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.7
# Install your custom connector
COPY target/sheets-source-connector-1.0.0.jar \
/usr/share/confluent-hub-components/sheets-source-connector/
# Build and push
# docker build -f Dockerfile.connect -t my-registry/kafka-connect:v1.2.0 .
# docker push my-registry/kafka-connect:v1.2.0
Deploy and Manage via REST API
# Verify plugin is loaded (after worker restart or new image deploy)
curl http://connect-host:8083/connector-plugins | jq '.[].class'
# Should include: "com.example.connect.SheetsSourceConnector"
# Create the connector instance
curl -X POST http://connect-host:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "google-sheets-source",
"config": {
"connector.class": "com.example.connect.SheetsSourceConnector",
"master.spreadsheet.id": "1AbC_xYz_masterSheetId",
"topic": "spreadsheet-data",
"poll.interval.ms": "300000",
"gcp.service.account.json": "${file:/opt/secrets/gcp.json:key}",
"tasks.max": "4",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}'
# Check status — "RUNNING" for connector and all tasks
curl http://connect-host:8083/connectors/google-sheets-source/status | jq .
# {
# "name": "google-sheets-source",
# "connector": {"state": "RUNNING", "worker_id": "connect-1:8083"},
# "tasks": [
# {"id": 0, "state": "RUNNING", "worker_id": "connect-1:8083"},
# {"id": 1, "state": "RUNNING", "worker_id": "connect-2:8083"},
# {"id": 2, "state": "RUNNING", "worker_id": "connect-1:8083"},
# {"id": 3, "state": "RUNNING", "worker_id": "connect-2:8083"}
# ]
# }
# Update config (zero-downtime — Connect restarts tasks automatically)
curl -X PUT http://connect-host:8083/connectors/google-sheets-source/config \
-H "Content-Type: application/json" \
-d '{ ... updated config ... }'
# Restart a single failed task
curl -X POST http://connect-host:8083/connectors/google-sheets-source/tasks/2/restart
# Delete the connector
curl -X DELETE http://connect-host:8083/connectors/google-sheets-source
Testing Connectors
// Unit test a SourceTask — mock the external system, verify SourceRecords
@Test
public void testPollReturnsNewRows() {
SheetsSourceTask task = new SheetsSourceTask();
// Mock the offset storage reader (no prior offsets)
SourceTaskContext context = mock(SourceTaskContext.class);
OffsetStorageReader offsetReader = mock(OffsetStorageReader.class);
when(context.offsetStorageReader()).thenReturn(offsetReader);
when(offsetReader.offset(any())).thenReturn(null);
task.initialize(context);
// Start with test config
Map<String, String> props = Map.of(
"assigned.spreadsheets", "sheet-1,sheet-2",
"topic", "test-topic",
"poll.interval.ms", "100",
"gcp.service.account.json", "{...test key...}"
);
task.start(props);
// Poll and verify
List<SourceRecord> records = task.poll();
assertFalse(records.isEmpty());
assertEquals("test-topic", records.get(0).topic());
assertEquals("sheet-1", records.get(0).key());
}
// Integration test: use EmbeddedConnectCluster (Kafka's test utilities)
// or Testcontainers with a real Kafka + Connect worker
// See: org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster
8. Schema Registry
Schema Registry stores versioned schemas and enforces schema compatibility between producers and consumers. Without it, schema evolution is a coordination nightmare.
Formats
| Format | Binary | Human-readable | Notes |
|---|---|---|---|
| Avro | Yes | No (schema separate) | Most common; compact; strong schema evolution support |
| Protobuf | Yes | No | Excellent multi-language support; field numbers for evolution |
| JSON Schema | No | Yes | Easiest to debug; largest messages; weakest validation |
Compatibility Modes
| Mode | New schema can | Old consumers | Old producers |
|---|---|---|---|
| BACKWARD (default) | Delete fields, add optional fields | Read new data | Must upgrade first |
| FORWARD | Add fields, delete optional fields | Must upgrade first | Can keep old schema |
| FULL | Add/delete optional fields only | Read new data | Can keep old schema |
| NONE | Anything | May break | May break |
| BACKWARD_TRANSITIVE | Compatible with ALL previous versions | Safe | Must upgrade |
Avro Schema Example
// Schema version 1 — initial
{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"}
]
}
// Schema version 2 — BACKWARD compatible additions
// Added optional field "region" with default (safe to add in BACKWARD mode)
{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"},
{"name": "region", "type": ["null", "string"], "default": null}
]
}
Using Schema Registry in Java
// Producer with Avro + Schema Registry
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
// Subject naming strategy: {topic}-value (default)
// props.put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema schema = new Schema.Parser().parse(new File("order.avsc"));
GenericRecord order = new GenericData.Record(schema);
order.put("id", "order-123");
order.put("customerId", "cust-456");
order.put("amount", 99.99);
order.put("createdAt", System.currentTimeMillis());
order.put("region", "us-east");
producer.send(new ProducerRecord<>("orders", "order-123", order));
9. Security
SSL/TLS Encryption
# Broker config (server.properties)
listeners=SSL://broker1:9093
ssl.keystore.location=/var/ssl/private/broker1.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/private/broker.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required # require mutual TLS (mTLS)
ssl.endpoint.identification.algorithm=https
// Client SSL config
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststorePassword");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystorePassword");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "keyPassword");
SASL Authentication
| Mechanism | Use Case | Notes |
|---|---|---|
| PLAIN | Simple username/password | Credentials sent in plaintext — always use with TLS |
| SCRAM-SHA-256/512 | Username/password with challenge-response | Safer than PLAIN; credentials stored hashed in ZK/KRaft |
| GSSAPI (Kerberos) | Enterprise/Hadoop environments | Requires KDC; most complex to set up |
| OAUTHBEARER | Token-based auth (OAuth 2.0 / OIDC) | Modern cloud-native setups; supports token refresh |
# Broker: SASL_SSL listener
listeners=SASL_SSL://broker1:9094
sasl.enabled.mechanisms=SCRAM-SHA-512
ssl.keystore.location=/var/ssl/private/broker1.keystore.jks
ssl.keystore.password=keystore-password
# Create SCRAM credential for a user
kafka-configs.sh --bootstrap-server broker1:9092 \
--alter --entity-type users --entity-name alice \
--add-config 'SCRAM-SHA-512=[iterations=8192,password=alice-secret]'
// Client SASL/SCRAM config
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"alice\" password=\"alice-secret\";");
ACLs (Access Control Lists)
# Allow producer "alice" to write to "orders" topic
kafka-acls.sh --bootstrap-server broker1:9092 \
--add \
--allow-principal User:alice \
--operation Write \
--operation Describe \
--topic orders
# Allow consumer group "order-processor" to read "orders"
kafka-acls.sh --bootstrap-server broker1:9092 \
--add \
--allow-principal User:service-account \
--operation Read \
--operation Describe \
--topic orders
kafka-acls.sh --bootstrap-server broker1:9092 \
--add \
--allow-principal User:service-account \
--operation Read \
--group order-processing-group
# List ACLs for a topic
kafka-acls.sh --bootstrap-server broker1:9092 --list --topic orders
# Deny all to a principal (blacklist)
kafka-acls.sh --bootstrap-server broker1:9092 \
--add \
--deny-principal User:bad-actor \
--operation All \
--topic '*'
10. Operations
Essential CLI Commands
# --- Topics ---
# List all topics
kafka-topics.sh --bootstrap-server broker1:9092 --list
# Describe a specific topic (shows partition/replica/ISR details)
kafka-topics.sh --bootstrap-server broker1:9092 --describe --topic orders
# Describe ALL topics (useful for auditing)
kafka-topics.sh --bootstrap-server broker1:9092 --describe
# --- Producing and Consuming (for testing) ---
# Console producer
kafka-console-producer.sh --bootstrap-server broker1:9092 \
--topic orders \
--property "key.separator=:" \
--property "parse.key=true"
# Then type: order-123:{"amount":99.99}
# Console consumer — read from beginning
kafka-console-consumer.sh --bootstrap-server broker1:9092 \
--topic orders \
--from-beginning \
--property print.key=true \
--property key.separator=" | "
# Console consumer — specific partition + offset
kafka-console-consumer.sh --bootstrap-server broker1:9092 \
--topic orders \
--partition 0 \
--offset 100 \
--max-messages 10
# --- Consumer Groups ---
# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
# Describe group (shows lag per partition)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--describe --group order-processing-group
# Reset offsets to beginning (must stop consumers first)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--group order-processing-group \
--topic orders \
--reset-offsets --to-earliest --execute
# Reset to specific offset
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--group order-processing-group \
--topic orders:0 \
--reset-offsets --to-offset 500 --execute
# Reset to timestamp (ISO 8601)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--group order-processing-group \
--topic orders \
--reset-offsets --to-datetime 2026-02-01T00:00:00.000 --execute
# Delete consumer group (must have no active members)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--delete --group old-group
# --- Configs ---
# Dynamically update broker config at runtime
kafka-configs.sh --bootstrap-server broker1:9092 \
--entity-type brokers --entity-name 1 \
--alter --add-config log.retention.ms=86400000
# Update topic config
kafka-configs.sh --bootstrap-server broker1:9092 \
--entity-type topics --entity-name orders \
--alter --add-config retention.ms=604800000,min.insync.replicas=2
# Describe topic configs (shows overrides)
kafka-configs.sh --bootstrap-server broker1:9092 \
--entity-type topics --entity-name orders --describe
# --- Log Dirs ---
# Show log directory info (disk usage, replicas)
kafka-log-dirs.sh --bootstrap-server broker1:9092 \
--topic-list orders --broker-list 1,2,3
# --- Partition Reassignment ---
# Generate reassignment plan
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3" \
--generate
# Execute reassignment
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
--reassignment-json-file reassignment.json \
--execute
# Verify reassignment progress
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
--reassignment-json-file reassignment.json \
--verify
Monitoring: Key JMX Metrics
| Metric | MBean | Alert if |
|---|---|---|
| UnderReplicatedPartitions | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | > 0 |
| ActiveControllerCount | kafka.controller:type=KafkaController,name=ActiveControllerCount | != 1 |
| OfflinePartitionsCount | kafka.controller:type=KafkaController,name=OfflinePartitionsCount | > 0 |
| MessagesInPerSec | kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec | Drop unexpectedly |
| BytesInPerSec / BytesOutPerSec | kafka.server:type=BrokerTopicMetrics | Saturation near NIC limit |
| RequestHandlerAvgIdlePercent | kafka.server:type=KafkaRequestHandlerPool | < 30% |
| Consumer Lag | kafka.consumer:type=consumer-fetch-manager-metrics,client-id=X | Growing over time |
| LogFlushRateAndTimeMs | kafka.log:type=LogFlushStats | High p99 flush time |
# Check consumer lag from CLI
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--describe --group order-processing-group
# Output columns:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
# orders 0 5000 5010 10 ... ...
# orders 1 8000 8000 0 ... ...
# High-resolution lag check with kafka-consumer-groups (all groups)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--describe --all-groups 2>/dev/null | awk 'NR>1 {print $1, $6}' | sort -k2 -rn | head -20
11. Production Setup
Deployment Options
| Option | Who Manages Brokers | Best For | Trade-off |
|---|---|---|---|
| Self-managed on EC2/VMs | You | Full control, cost optimization at scale, custom tuning | You own upgrades, patching, monitoring, and failover |
| Amazon MSK | AWS | AWS-native teams, serverless option available | Less config flexibility, MSK-specific quirks (e.g., storage auto-scaling limits) |
| Confluent Cloud | Confluent | Schema Registry + ksqlDB + Connect as managed services | Most expensive, vendor lock-in on Confluent-specific features |
| Kubernetes (Strimzi) | You (via operator) | Existing K8s infra, GitOps workflows | K8s networking complexity, persistent volume management |
| Aiven / Redpanda Cloud | Provider | Multi-cloud managed Kafka (Aiven) or Kafka-compatible (Redpanda) | Smaller ecosystem than Confluent, Redpanda has minor API differences |
Self-Managed on EC2 with Docker
Infrastructure Layout
# Typical 3-broker production cluster on EC2
#
# EC2 Instance Type: r6i.xlarge (4 vCPU, 32 GB RAM) — Kafka is memory + I/O heavy
# - General purpose (m6i) works for moderate throughput
# - Storage optimized (i3/d3) if you want NVMe local disks instead of EBS
#
# EBS Volume per broker:
# - Type: gp3 (3,000 IOPS baseline, burstable, cheaper than io2)
# - Size: Start with 500 GB, monitor and expand as needed
# - Mount: /data/kafka — this is your log.dirs path
# - DO NOT use the root volume for Kafka data
#
# Networking:
# - Same VPC, spread across 3 AZs for fault tolerance
# - Security group: allow 9092 (broker), 9093 (inter-broker), 8083 (Connect REST)
# - Private subnet — no public IPs on brokers
# Directory structure on each EC2 instance
/data/kafka/ # EBS volume mount — Kafka log.dirs
/opt/kafka/ # Kafka binaries (in Docker image)
/opt/kafka-connect/ # Connect worker (separate container)
/var/log/kafka/ # Application logs (stdout from Docker)
Docker Compose for a Broker Node
# docker-compose.yml — run on each EC2 instance
# Each instance runs ONE broker. Don't colocate brokers.
version: "3.8"
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka-broker
network_mode: host # simplifies advertised.listeners
restart: always
volumes:
- /data/kafka:/var/lib/kafka/data # EBS volume
- /var/log/kafka:/var/log/kafka
environment:
KAFKA_NODE_ID: 1 # unique per broker
KAFKA_PROCESS_ROLES: broker,controller # KRaft mode (no ZK)
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker1:9093,2@broker2:9093,3@broker3:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1.internal:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_LOG_RETENTION_HOURS: 168 # 7 days
KAFKA_LOG_RETENTION_BYTES: -1 # unlimited (rely on time)
KAFKA_NUM_PARTITIONS: 6 # default for auto-created topics
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2 # acks=all needs 2/3 in sync
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" # explicit topic creation only
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
KAFKA_LOG_SEGMENT_BYTES: 1073741824 # 1 GB segments
KAFKA_NUM_NETWORK_THREADS: 8
KAFKA_NUM_IO_THREADS: 16
KAFKA_HEAP_OPTS: "-Xms6g -Xmx6g" # ~25% of instance RAM
KAFKA_JMX_PORT: 9999 # for monitoring
KAFKA_JMX_HOSTNAME: broker1.internal
ulimits:
nofile:
soft: 65536
hard: 65536
Kafka Connect Cluster
# docker-compose.connect.yml — run on separate EC2 instances
services:
kafka-connect:
image: confluentinc/cp-kafka-connect:7.6.0
container_name: kafka-connect
network_mode: host
restart: always
environment:
CONNECT_BOOTSTRAP_SERVERS: broker1:9092,broker2:9092,broker3:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
volumes:
- ./connect-plugins:/usr/share/confluent-hub-components
# Deploy a connector via REST API
curl -X POST http://connect-host:8083/connectors -H "Content-Type: application/json" -d '{
"name": "s3-sink-events",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "events",
"s3.region": "us-east-1",
"s3.bucket.name": "my-data-lake",
"s3.part.size": 52428800,
"flush.size": 10000,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\"'\"'year'\"'\"'=YYYY/'\"'\"'month'\"'\"'=MM/'\"'\"'day'\"'\"'=dd",
"locale": "en-US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_time",
"tasks.max": 4
}
}'
# Check connector status
curl http://connect-host:8083/connectors/s3-sink-events/status | jq .
# Pause / resume / restart a connector
curl -X PUT http://connect-host:8083/connectors/s3-sink-events/pause
curl -X PUT http://connect-host:8083/connectors/s3-sink-events/resume
curl -X POST http://connect-host:8083/connectors/s3-sink-events/restart
# List installed connector plugins
curl http://connect-host:8083/connector-plugins | jq '.[].class'
EBS Volume Best Practices
| Setting | Recommendation | Why |
|---|---|---|
| Volume type | gp3 (general), io2 (high throughput) | gp3 gives 3,000 IOPS + 125 MB/s baseline for free; io2 for sustained high I/O |
| Size | Plan for retention period × daily ingest × replication factor | E.g., 7 days × 50 GB/day × RF 3 = 1 TB per broker |
| Filesystem | XFS | Better than ext4 for Kafka's sequential write + concurrent read pattern |
| Mount options | noatime,nodiratime | Avoid unnecessary metadata writes on every read |
| Multiple volumes | Use log.dirs=/data1,/data2 with separate EBS volumes | Striping across volumes increases throughput; broker survives single volume failure (JBOD mode) |
| Snapshots | EBS snapshots for disaster recovery, NOT for backups | Kafka's replication is the primary backup mechanism. Snapshots are for catastrophic AZ-level failures |
# Format and mount an EBS volume for Kafka
# 1. Attach volume in AWS console or CLI (e.g., /dev/xvdf)
# 2. Format with XFS
sudo mkfs.xfs /dev/xvdf
# 3. Mount with optimized options
sudo mkdir -p /data/kafka
sudo mount -o noatime,nodiratime /dev/xvdf /data/kafka
# 4. Persist in fstab
echo "/dev/xvdf /data/kafka xfs noatime,nodiratime 0 0" | sudo tee -a /etc/fstab
# 5. Set ownership for the Kafka container user
sudo chown -R 1000:1000 /data/kafka # cp-kafka runs as UID 1000
Amazon MSK
# Create an MSK cluster via AWS CLI
aws kafka create-cluster \
--cluster-name "prod-events" \
--kafka-version "3.6.0" \
--number-of-broker-nodes 3 \
--broker-node-group-info '{
"InstanceType": "kafka.m5.large",
"ClientSubnets": ["subnet-aaa", "subnet-bbb", "subnet-ccc"],
"SecurityGroups": ["sg-kafka"],
"StorageInfo": {
"EbsStorageInfo": {
"VolumeSize": 500,
"ProvisionedThroughput": {
"Enabled": true,
"VolumeThroughput": 250
}
}
}
}' \
--encryption-info '{
"EncryptionInTransit": {"ClientBroker": "TLS", "InCluster": true}
}' \
--enhanced-monitoring "PER_TOPIC_PER_BROKER"
# Get bootstrap brokers
aws kafka get-bootstrap-brokers --cluster-arn <arn>
# Returns: BootstrapBrokerStringTls: "b-1.prod.xxx:9094,b-2.prod.xxx:9094,..."
# MSK Serverless — zero capacity planning (pay per data)
aws kafka create-cluster-v2 \
--cluster-name "serverless-events" \
--serverless '{
"VpcConfigs": [{"SubnetIds": ["subnet-aaa","subnet-bbb"], "SecurityGroupIds": ["sg-kafka"]}],
"ClientAuthentication": {"Sasl": {"Iam": {"Enabled": true}}}
}'
Self-managed wins on: Full config control (MSK restricts some broker configs), latest Kafka versions faster, cost at scale (MSK has a per-broker-hour markup), KRaft mode (MSK still uses ZK as of early 2026).
Kubernetes (Strimzi)
# Strimzi Kafka CRD — declarative Kafka cluster on K8s
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: prod-cluster
namespace: kafka
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
auto.create.topics.enable: false
storage:
type: persistent-claim
size: 500Gi
class: gp3-encrypted # StorageClass backed by EBS gp3
deleteClaim: false # keep PVCs on cluster deletion
resources:
requests:
memory: 8Gi
cpu: "2"
limits:
memory: 8Gi
cpu: "4"
jvmOptions:
-Xms: 4096m
-Xmx: 4096m
rack:
topologyKey: topology.kubernetes.io/zone # spread across AZs
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 50Gi
class: gp3-encrypted
entityOperator:
topicOperator: {} # manage topics via KafkaTopic CRDs
userOperator: {} # manage ACLs via KafkaUser CRDs
# Manage topics declaratively via CRD (GitOps-friendly)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: orders
labels:
strimzi.io/cluster: prod-cluster
spec:
partitions: 12
replicas: 3
config:
retention.ms: "604800000" # 7 days
min.insync.replicas: "2"
cleanup.policy: delete
Monitoring Stack
- Broker health — under-replicated partitions, offline partitions, controller status
- Consumer lag — the #1 operational metric. Growing lag = consumer can't keep up
- Disk utilization — brokers die when disk fills; must alert early
- Throughput — bytes in/out per broker, messages/sec per topic
Prometheus + Grafana (Self-Managed)
# JMX Exporter sidecar — expose Kafka JMX metrics as Prometheus endpoints
# Add to your broker's Docker Compose:
services:
kafka:
environment:
KAFKA_JMX_PORT: 9999
KAFKA_OPTS: "-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=7071:/opt/jmx-exporter/kafka-broker.yml"
volumes:
- ./jmx-exporter:/opt/jmx-exporter
# prometheus.yml — scrape targets
scrape_configs:
- job_name: kafka-brokers
static_configs:
- targets:
- broker1:7071
- broker2:7071
- broker3:7071
- job_name: kafka-lag
static_configs:
- targets:
- kafka-lag-exporter:8000
Consumer Lag Monitoring
# Burrow (LinkedIn) — dedicated consumer lag monitoring service
# Evaluates lag status: OK, WARNING, STALLED, ERR, STOP
# Exposes HTTP API for lag per consumer group and partition
# docker run -v /path/to/burrow.toml:/etc/burrow/burrow.toml linkedin/burrow
# Kafka Lag Exporter (Lightbend) — Prometheus-native
# Helm chart for K8s:
# helm install kafka-lag-exporter kafka-lag-exporter/kafka-lag-exporter \
# --set clusters[0].name=prod --set clusters[0].bootstrapBrokers=broker1:9092
# CLI — quick check
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--describe --group my-consumer-group
# Look at LAG column — should be near 0 and stable
# Key lag thresholds (tune for your SLA):
# LAG < 100: OK
# LAG 100 - 10000: WARNING — consumer falling behind
# LAG > 10000: CRITICAL — something is wrong
# LAG growing: CRITICAL regardless of absolute value
Critical Alerts
| Alert | Condition | Action |
|---|---|---|
| Disk > 75% | EBS volume utilization > 75% | Expand EBS volume, reduce retention, or add brokers |
| Disk > 90% | EBS volume utilization > 90% | Urgent: Broker will stop accepting writes. Reduce retention immediately: kafka-configs.sh --alter --add-config retention.ms=86400000 |
| Under-replicated > 0 | Partitions with fewer in-sync replicas than expected | Check broker health, network, disk I/O. Could be a slow/dead broker |
| Offline partitions > 0 | Partitions with no available leader | Broker is down. Check logs. If broker can't recover, reassign partitions |
| Consumer lag growing | Lag increasing over 15-min window | Scale consumers, check for slow processing, verify downstream health |
| Controller count != 1 | No active controller or multiple controllers | Cluster split-brain or controller crash. Restart affected broker |
Message Inspection Tools
# Console consumer — quick peek at a topic
kafka-console-consumer.sh --bootstrap-server broker1:9092 \
--topic orders --from-beginning --max-messages 5 \
--property print.key=true --property print.timestamp=true
# kcat (formerly kafkacat) — the "Swiss army knife" for Kafka
# Install: brew install kcat (or apt-get install kafkacat)
# Consume with metadata (partition, offset, timestamp)
kcat -b broker1:9092 -t orders -C -f '%T %p %o %k %s\n' -c 5
# Produce a message
echo '{"id":1,"amount":99.99}' | kcat -b broker1:9092 -t orders -P -k "order-1"
# List topics and metadata
kcat -b broker1:9092 -L
# Consume a specific partition from a specific offset
kcat -b broker1:9092 -t orders -p 2 -o 1000 -c 10
# AKHQ (formerly KafkaHQ) — web UI for topic browsing, consumer groups, ACLs
# docker run -p 8080:8080 -e AKHQ_CONFIGURATION='...' tchiotludo/akhq
# Or Conduktor, Kafka UI (provectus/kafka-ui) — all serve the same purpose
Cluster Administration
Preferred Leader Election
# After a broker restart, it may not be the leader of its partitions anymore
# (leadership moved to another ISR member during the outage)
# Trigger preferred leader election to rebalance leadership
# Elect preferred leaders for ALL topics
kafka-leader-election.sh --bootstrap-server broker1:9092 \
--election-type PREFERRED --all-topic-partitions
# Elect for a specific topic
kafka-leader-election.sh --bootstrap-server broker1:9092 \
--election-type PREFERRED \
--topic orders --partition 0
# UNCLEAN election (data loss risk!) — only as last resort when all ISR are dead
kafka-leader-election.sh --bootstrap-server broker1:9092 \
--election-type UNCLEAN \
--topic orders --partition 0
# Verify leadership distribution
kafka-topics.sh --bootstrap-server broker1:9092 --describe --topic orders
# Leader column should be evenly spread across brokers
Adding and Removing Brokers
# Adding a broker:
# 1. Launch new EC2 instance with EBS volume + Docker setup
# 2. Configure with a new unique broker.id and join the controller quorum
# 3. Start the broker — it joins the cluster but has NO partitions yet
# 4. Reassign partitions to include the new broker
# Generate reassignment plan
cat <<EOF > topics.json
{"topics": [{"topic": "orders"}, {"topic": "events"}], "version": 1}
EOF
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" \
--generate
# Save the "Proposed partition reassignment" output to reassignment.json
# Execute the reassignment (runs in background, copies data)
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
--reassignment-json-file reassignment.json \
--execute \
--throttle 50000000 # 50 MB/s throttle to avoid impacting production
# Monitor progress
kafka-reassign-partitions.sh --bootstrap-server broker1:9092 \
--reassignment-json-file reassignment.json \
--verify
# Remove throttle after completion
kafka-configs.sh --bootstrap-server broker1:9092 \
--entity-type brokers --entity-name 1 \
--alter --delete-config leader.replication.throttled.rate
kafka-configs.sh --bootstrap-server broker1:9092 \
--entity-type brokers --entity-name 4 \
--alter --delete-config follower.replication.throttled.rate
# Removing a broker:
# 1. Reassign all partitions OFF the broker (use --broker-list without the broker)
# 2. Wait for reassignment to complete (--verify)
# 3. Stop the broker
# 4. Terminate the EC2 instance
Scaling: When Brokers Run Out of Disk
# Immediate actions when disk is filling up:
# 1. Reduce retention on high-volume topics (buys time)
kafka-configs.sh --bootstrap-server broker1:9092 \
--entity-type topics --entity-name high-volume-events \
--alter --add-config retention.ms=86400000 # 1 day instead of 7
# 2. Delete old consumer offsets for abandoned groups
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--delete --group abandoned-consumer-group
# 3. Expand the EBS volume (online, no downtime)
aws ec2 modify-volume --volume-id vol-abc123 --size 1000 # grow to 1 TB
# Then resize the filesystem on the instance:
sudo xfs_growfs /data/kafka # XFS supports online resize
# 4. Long-term: add brokers and rebalance partitions
# (see "Adding and Removing Brokers" above)
# 5. Long-term: enable tiered storage (Kafka 3.6+)
# Offload cold log segments to S3 while keeping hot data on local disk
# server.properties:
# remote.log.storage.system.enable=true
# remote.log.storage.manager.class.name=... # S3 plugin
Rolling Restarts (Zero Downtime)
# Safe rolling restart procedure for config changes or upgrades:
# 1. Ensure min.insync.replicas < replication.factor (e.g., 2 of 3)
# so the cluster can tolerate one broker being down
for BROKER_HOST in broker1 broker2 broker3; do
echo "=== Restarting $BROKER_HOST ==="
# Stop the broker
ssh $BROKER_HOST "docker compose stop kafka"
# Wait for ISR recovery — all partitions must be fully replicated
# before stopping the next broker
echo "Waiting for under-replicated partitions to reach 0..."
while true; do
URP=$(kafka-topics.sh --bootstrap-server broker1:9092 --describe \
--under-replicated-partitions 2>/dev/null | wc -l)
if [ "$URP" -eq 0 ]; then break; fi
echo " Still $URP under-replicated partitions, waiting..."
sleep 10
done
# Start the broker
ssh $BROKER_HOST "docker compose up -d kafka"
# Wait for it to rejoin ISR
sleep 30
echo "=== $BROKER_HOST restarted ==="
done
# After all restarts, trigger preferred leader election
kafka-leader-election.sh --bootstrap-server broker1:9092 \
--election-type PREFERRED --all-topic-partitions
Production Readiness Checklist
| Category | Check |
|---|---|
| Replication | replication.factor=3, min.insync.replicas=2 for all important topics. Producers use acks=all |
| Durability | unclean.leader.election.enable=false (prevents data loss). auto.create.topics.enable=false |
| Storage | Dedicated EBS volumes (gp3+), XFS filesystem, noatime mount. Disk alert at 75% |
| Networking | Brokers in private subnet across 3 AZs. Security groups restrict ports. TLS for inter-broker and client traffic |
| Monitoring | Prometheus + Grafana (or CloudWatch). Alerts for: under-replicated partitions, disk usage, consumer lag, controller count |
| Consumer lag | Dedicated lag monitor (Burrow, Kafka Lag Exporter). Alert on growing lag, not just absolute value |
| Authentication | SASL/SCRAM or mTLS for client auth. ACLs to restrict topic access per service |
| Backpressure | Producer buffer.memory and max.block.ms tuned. Consumer max.poll.records set to sustainable batch size |
| Schema | Schema Registry enforcing BACKWARD compatibility. All topics use Avro/Protobuf, not raw JSON |
| Upgrades | Rolling restart procedure documented and tested. Broker inter.broker.protocol.version set for safe upgrades |
| Disaster recovery | MirrorMaker 2 or Confluent Replicator for cross-region replication if RPO matters |
Data Contracts
Schema Registry stops producers from making breaking structural changes — but structural changes are only one of three ways upstream teams silently break downstream analytics.
The Problem: Three Failure Modes
| Failure Mode | Example | Detectable By |
|---|---|---|
| Field rename | user_id → userId |
Schema Registry (structural — breaks compatibility check) |
| Field drop | Remove amount from payload |
Schema Registry (structural — breaks compatibility check) |
| Field repurpose | amount changes from dollars to cents; status gains 40 new values no consumer knows about |
Nothing technical — schema is still valid. Downstream gets silently wrong numbers. |
The third failure mode — semantic drift — is the hardest to catch and the one most likely to corrupt production analytics quietly for days before anyone notices. A complete data contract strategy needs four layers.
Layer 1: Schema Registry Enforcement
Use Confluent Schema Registry (or AWS Glue Schema Registry) with FULL compatibility mode. FULL means new schemas must be both backward-compatible (old consumers can read new messages) and forward-compatible (new consumers can read old messages). This catches field drops and renames at producer deploy time, before a single message is published.
Use Avro or Protobuf — not raw JSON. JSON has no enforced schema at the wire level. If you must use JSON payloads (e.g., third-party producers you don't control), add a validation layer at ingestion: parse incoming records against a registered JSON Schema and route non-conforming records to a dead-letter topic rather than letting them flow through.
# Register a schema (Avro)
curl -X POST http://schema-registry:8081/subjects/user-events-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"UserEvent\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":[\"null\",\"double\"],\"default\":null}]}"}'
# Check compatibility of a proposed schema change before registering
curl -X POST http://schema-registry:8081/compatibility/subjects/user-events-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "..."}'
# Response: {"is_compatible": false} ← producer CI fails here, not in production
Layer 2: Consumer-Driven Contracts
Borrowed from microservices contract testing (Pact). Each downstream consumer declares which fields they depend on and what invariants they expect. Declarations live in a shared repo. The upstream team's CI runs contract validation before any deployment — if a proposed schema change would break a declared consumer contract, the producer's pipeline fails.
# contracts/analytics-platform.yaml
consumer: analytics-platform
depends_on:
- topic: user-events
fields:
- name: user_id
type: string
required: true
- name: event_type
type: string
required: true
allowed_values: [signup, login, purchase, churn]
- name: amount
type: number
constraints: { min: 0 }
- topic: order-events
fields:
- name: order_id
type: string
required: true
- name: total_usd
type: number
required: true
constraints: { min: 0, max: 1000000 }
The upstream CI loads all contract files from the shared repo and runs a validator against the proposed schema and any annotated invariants. If any consumer contract fails, the deployment is blocked. This forces schema negotiation to happen before production — not after.
Layer 3: Statistical Distribution Monitoring
The only defense against semantic drift. Track value distributions for important fields over time: cardinality, null rate, min/max, value frequency histograms. Use Population Stability Index (PSI) or a simple chi-square test to detect when a distribution shifts significantly between time windows. A PSI > 0.2 on a field signals a meaningful change.
If a field called status suddenly goes from 5 distinct values to 50, or amount shifts from averaging $45 to averaging $4500, something has changed semantically — even though the schema is structurally valid.
# Daily distribution check — run as a Spark job or scheduled Python script
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("distribution-monitor").getOrCreate()
# Load yesterday's and today's snapshots from your data lake
yesterday = spark.read.parquet("s3://lake/user-events/date=2024-01-14/")
today = spark.read.parquet("s3://lake/user-events/date=2024-01-15/")
def field_profile(df, field: str) -> dict:
agg = df.agg(
F.count(field).alias("count"),
F.countDistinct(field).alias("cardinality"),
F.sum(F.when(F.col(field).isNull(), 1).otherwise(0)).alias("null_count"),
F.min(field).alias("min_val"),
F.max(field).alias("max_val"),
).collect()[0]
return agg.asDict()
for field in ["event_type", "amount", "status"]:
prev = field_profile(yesterday, field)
curr = field_profile(today, field)
cardinality_change = abs(curr["cardinality"] - prev["cardinality"]) / max(prev["cardinality"], 1)
null_rate_prev = prev["null_count"] / max(prev["count"], 1)
null_rate_curr = curr["null_count"] / max(curr["count"], 1)
if cardinality_change > 0.5: # cardinality jumped >50%
alert(f"WARN: {field} cardinality changed {prev['cardinality']} → {curr['cardinality']}")
if abs(null_rate_curr - null_rate_prev) > 0.05: # null rate shifted >5pp
alert(f"WARN: {field} null rate changed {null_rate_prev:.1%} → {null_rate_curr:.1%}")
# For numeric fields, compute PSI over bucketed distributions
def compute_psi(expected_dist: list[float], actual_dist: list[float]) -> float:
psi = 0.0
for e, a in zip(expected_dist, actual_dist):
e = max(e, 1e-10)
a = max(a, 1e-10)
psi += (a - e) * (a / e)
return psi
# PSI < 0.1: no significant change
# PSI 0.1–0.2: moderate shift, investigate
# PSI > 0.2: major shift — alert and escalate
Layer 4: Organizational Policy
Technology alone cannot enforce data contracts. The organizational layer closes the gaps:
| Policy | Implementation |
|---|---|
| Deprecation window | Fields must be marked deprecated (and announced) at least 4 weeks before removal. Producers keep emitting the field during the window. |
| Breaking change channel | A dedicated Slack channel (e.g., #data-schema-changes) where all breaking or potentially breaking changes are announced with a migration guide. |
| Topic ownership | Every topic has a declared owner in the schema registry or a central catalog. Ownership is enforced — no topic without an owner merges to production. |
| Dependency registry | Every consumer team registers which topics and fields they depend on. This powers the consumer-driven contract tests (Layer 2) and gives producers a blast radius estimate before any change. |
| Data lineage tooling | Tools like DataHub or OpenMetadata visualize which pipelines consume which topics, what transformations they apply, and which dashboards or models are downstream. Makes the blast radius of a schema change visible before it happens. |
12. Configuration Reference
Broker Configs
| Config | Default | Description |
|---|---|---|
broker.id | -1 (auto) | Unique broker identifier |
listeners | PLAINTEXT://:9092 | Listener addresses (host:port per protocol) |
log.dirs | /tmp/kafka-logs | Directories for log data (use multiple for striping) |
num.partitions | 1 | Default partition count for auto-created topics |
default.replication.factor | 1 | Default replication factor for auto-created topics |
min.insync.replicas | 1 | Minimum ISR required for a write to succeed |
log.retention.ms | 604800000 (7d) | Time to retain log segments |
log.retention.bytes | -1 (unlimited) | Max bytes per partition before deleting old segments |
log.segment.bytes | 1073741824 (1GB) | Max size of a single log segment |
log.roll.ms | 604800000 (7d) | Force roll of the active segment after this time |
replica.lag.time.max.ms | 30000 | Max time a follower can lag before being removed from ISR |
num.network.threads | 3 | Threads for network I/O |
num.io.threads | 8 | Threads for disk I/O |
socket.send.buffer.bytes | 102400 | TCP send buffer size |
message.max.bytes | 1048576 (1MB) | Max message size (must align with consumer fetch.max.bytes) |
auto.create.topics.enable | true | Set false in production to prevent accidental topic creation |
delete.topic.enable | true | Allow topic deletion via admin API |
unclean.leader.election.enable | false | Allow out-of-sync replica to become leader (data loss risk) |
compression.type | producer | Broker-level compression override ('producer' = use producer's setting) |
Producer Configs
| Config | Default | Description |
|---|---|---|
acks | all | Required acknowledgements (0, 1, all) |
retries | 2147483647 | Number of retries on transient failures |
retry.backoff.ms | 100 | Wait between retry attempts |
delivery.timeout.ms | 120000 (2m) | Total time budget for a send() call including retries |
batch.size | 16384 (16KB) | Max bytes in a batch per partition |
linger.ms | 0 | Time to wait for batch to fill before sending |
buffer.memory | 33554432 (32MB) | Total memory for buffering records before sending |
max.block.ms | 60000 (1m) | Time to block send() if buffer is full |
compression.type | none | Compression: none, gzip, snappy, lz4, zstd |
enable.idempotence | true | Idempotent producer (prevents duplicate on retry) |
max.in.flight.requests.per.connection | 5 | Max unacknowledged sends (must be 1 for strict order without idempotence) |
request.timeout.ms | 30000 | Time to wait for broker response before retry |
transactional.id | null | Unique ID for transactional producer; enables exactly-once |
transaction.timeout.ms | 60000 (1m) | Max time for a transaction before broker aborts it |
Consumer Configs
| Config | Default | Description |
|---|---|---|
group.id | "" | Consumer group name (required for group management) |
auto.offset.reset | latest | What to do when no committed offset: earliest, latest, none |
enable.auto.commit | true | Auto-commit offsets periodically |
auto.commit.interval.ms | 5000 | Frequency of auto-commit |
max.poll.records | 500 | Max records returned per poll() call |
max.poll.interval.ms | 300000 (5m) | Max time between poll() calls before consumer is kicked |
session.timeout.ms | 45000 | Heartbeat timeout for group membership |
heartbeat.interval.ms | 3000 | Heartbeat frequency (should be 1/3 of session.timeout.ms) |
fetch.min.bytes | 1 | Minimum bytes to fetch before returning (batching) |
fetch.max.bytes | 52428800 (50MB) | Max bytes per fetch response |
fetch.max.wait.ms | 500 | Max wait when fetch.min.bytes not met |
isolation.level | read_uncommitted | Transaction visibility: read_uncommitted or read_committed |
partition.assignment.strategy | RangeAssignor, CooperativeStickyAssignor | Partition assignment algorithms |
13. Common Pitfalls
Consumer Lag Growth
Causes:
- Processing is too slow for the incoming message rate
- Too few consumer instances for the partition count
- A slow downstream system (database, HTTP call) blocks the poll loop
- GC pauses or OOM causing consumer to fall behind
Fix: Profile processing time, scale out consumers (add instances up to partition count), offload slow work to async threads (carefully — offset management gets complex).
Rebalancing Storms
Causes:
- Processing takes longer than
max.poll.interval.ms— consumer appears dead - Consumer crashes or restarts frequently (bad deployment, OOM)
- Too many partitions with eager assignment strategy — rebalance takes too long
Fix:
- Increase
max.poll.interval.msto match realistic processing time - Reduce
max.poll.recordsto process smaller batches faster - Switch to
CooperativeStickyAssignor— minimizes disruption - Fix the root cause of slow processing or crashes
Partition Count Mistakes
- Consumer group limited to < your desired parallelism
- Cannot scale consumers beyond partition count
- Hotspot risk if one partition key dominates
- Each partition requires a file handle, memory for producer batches, and a controller thread slot
- Leader election time is O(partitions) — cluster recovery takes longer
- End-to-end latency increases (more partition metadata to propagate)
- Rule of thumb: < 10,000 partitions per broker; < 200,000 per cluster (ZK mode)
Message Ordering Guarantees
- Within a partition: strict order guaranteed
- Across partitions: no ordering guarantee
- If you need global ordering across a topic: use a single partition (sacrifices parallelism)
- If you need ordering per entity (e.g., per user): use the entity ID as the message key — all messages for that key land on the same partition
- Retries with
max.in.flight.requests.per.connection > 1and no idempotence can reorder messages — always enable idempotence
Large Messages
Large messages cause memory pressure, replication delays, and consumer fetch timeouts. Three configs must align:
- Broker:
message.max.bytes - Topic:
max.message.bytes - Consumer:
fetch.max.bytesandmax.partition.fetch.bytes
Better solution: Store large payloads (images, blobs) in object storage (S3, GCS) and send a reference URL in the Kafka message. Keep messages small (< 100KB).
Offset Reset Gotchas
- If a consumer group has committed offsets,
auto.offset.resetis ignored entirely - To replay: explicitly reset offsets using
kafka-consumer-groups.sh --reset-offsets - Resetting offsets while consumers are running causes offsets to be re-committed immediately — always stop consumers before resetting
auto.offset.reset=latestmeans you miss messages produced before the consumer first started — useearliestfor new consumers joining an existing topic
Producer Timeouts and Retries
// Common misconfiguration: delivery.timeout.ms < retries * retry.backoff.ms
// This causes the producer to give up before exhausting retries
// Fix: delivery.timeout.ms should be >> (retries * retry.backoff.ms)
// Example of correct config for high-durability producer:
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000); // 2 min total budget
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000); // per-request timeout
// delivery.timeout >= request.timeout + linger.ms
// TimeoutException vs NotLeaderOrFollowerException:
// TimeoutException = broker didn't respond within request.timeout.ms (retry)
// NotLeaderOrFollowerException = sent to stale leader (retry with metadata refresh)
// Both are retried automatically with enable.idempotence=true
Offset Commit Semantics Pitfalls
// WRONG: Commit before processing — at-most-once
// If process() throws, the offset is already committed — message is lost
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
consumer.commitSync(); // committed before processing!
for (ConsumerRecord r : records) {
process(r); // if this throws, the record is skipped forever
}
}
// WRONG: Auto-commit with slow async processing
// The auto-commit timer fires on the poll() thread, independent of your processing
// If you hand off records to another thread, the offset may be committed before
// those threads finish — causing silent message loss on crash
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// CORRECT: Manual commit after processing
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord r : records) {
process(r); // process first
}
consumer.commitSync(); // then commit
}
Schema Evolution Pitfalls
- Renaming a field is always a breaking change in Avro — to consumers it looks like the old field was deleted and a new field was added
- Changing a field type (e.g., int to long) is breaking in most formats
- Adding a required field without a default breaks BACKWARD compatibility — old data doesn't have this field
- Fix: Always add fields as optional (union with null + default null in Avro), never remove fields used by downstream consumers
Zombie Consumers / Fencing
A consumer that experiences a long GC pause may exceed session.timeout.ms, be removed from the group, and have its partitions reassigned. When the paused consumer resumes, it's now a "zombie" — it will try to commit offsets for partitions it no longer owns. Kafka rejects these commits with a CommitFailedException. The correct response is to stop processing and rejoin the group.
The transactional producer uses epoch fencing — when a new instance with the same transactional.id calls initTransactions(), it bumps the epoch, causing the old (zombie) producer's transactions to be rejected with ProducerFencedException.