kafka-expert
Deep expertise in Apache Kafka architecture, partitioning, consumer groups, exactly-once semantics, Schema Registry, Kafka Streams, retention, and production operations. Trigger phrases: when working with Kafka, Kafka partitioning strategy, consumer group rebalancing,
Kafka Expert
Kafka is a distributed, replicated, ordered log — not a message queue in the traditional sense. Understanding that it stores every event durably (not just until consumed) and that consumers move their own pointers through the log unlocks why it scales so differently from RabbitMQ or SQS. Most Kafka production problems trace to: under-partitioned topics, auto-commit hiding data loss bugs, and not understanding consumer group rebalancing.
Core Mental Model
A Kafka topic is a partitioned, replicated log. Producers append to the end; consumers read at their own pace using offsets. A consumer group is a set of consumers that collectively process all partitions of a topic — each partition is assigned to exactly one consumer in the group at any time. Throughput scales by adding partitions (more parallelism) and consumers (more processing power). Retention is time/size based, not consumption based — messages stay until the retention window expires, regardless of whether they've been consumed.
Topic Partitioning Strategy
Choosing partition count:
Start with: max(desired_consumer_parallelism, desired_producer_throughput / per_partition_throughput)
Per partition: ~10 MB/s write throughput (rule of thumb)
Example:
100 MB/s target throughput → 100 / 10 = 10 partitions minimum
Need 20 consumers → 20 partitions minimum
→ Choose 20 partitions
Kafka adds overhead per partition:
Each partition replica = open file handle + memory for replication
> 4000 partitions per broker → leadership election slowness
> 200,000 partitions per cluster → zookeeper/kraft issues (older Kafka)
Partition key selection:
Goal: even distribution + related events to same partition (ordering)
✅ Good keys:
user_id → events per user stay ordered, even distribution if UUIDs
tenant_id + entity_id → multi-tenant isolation
❌ Bad keys:
country (low cardinality → hot partitions)
timestamp (monotonic → routes everything to last partition with some hashers)
null key → round-robin (loses ordering guarantees)
// Producer with explicit partition key
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // topic
userId.toString(), // key (determines partition)
orderJson // value
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Failed to send: partition={}, offset={}",
metadata.partition(), metadata.offset());
}
});
Producer Configuration
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// Reliability settings
props.put("acks", "all"); // wait for all ISR replicas
props.put("enable.idempotence", "true"); // exactly-once delivery (deduplication)
props.put("max.in.flight.requests.per.connection", "5"); // ordering + idempotence
// Performance settings
props.put("compression.type", "lz4"); // lz4 or zstd for throughput
props.put("linger.ms", "5"); // batch for 5ms before sending
props.put("batch.size", "65536"); // 64KB batch size
// Retry settings (idempotence makes retries safe)
props.put("retries", Integer.MAX_VALUE);
props.put("delivery.timeout.ms", "120000"); // 2 minutes total
props.put("request.timeout.ms", "30000");
// Schema Registry (Confluent)
props.put("schema.registry.url", "http://schema-registry:8081");
Consumer Group Mechanics
// Consumer with manual offset commit (production-safe)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "order-processors");
props.put("auto.offset.reset", "earliest"); // start from beginning on new group
props.put("enable.auto.commit", "false"); // NEVER true in production
props.put("max.poll.records", "500");
props.put("max.poll.interval.ms", "300000"); // 5 min — time between poll() calls
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
try {
while (true) {
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Order> record : records) {
try {
processOrder(record.value());
} catch (RetryableException e) {
// Send to retry topic
producer.send(new ProducerRecord<>("orders.retry", record.key(), record.value()));
} catch (NonRetryableException e) {
// Send to DLQ
producer.send(new ProducerRecord<>("orders.dlq", record.key(), record.value()));
}
}
// Commit only after all records processed
consumer.commitSync();
}
} finally {
consumer.close();
}
// Static membership: prevents rebalancing when consumer restarts quickly
props.put("group.instance.id", "worker-" + hostname); // unique stable ID
props.put("session.timeout.ms", "60000"); // longer timeout for static members
Cooperative Incremental Rebalancing
// Default rebalancing (eager): ALL partitions revoked from ALL consumers, then reassigned
// Problem: processing stops entirely during rebalance (seconds to minutes in large groups)
// Solution: Cooperative Incremental Rebalancing (Kafka 2.4+)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Only partitions that NEED to move are revoked — others keep processing
Exactly-Once Semantics (EOS)
Three levels of delivery:
At-most-once: Producer sends, doesn't retry. Consumer commits before processing.
→ Possible message loss, never duplicates
At-least-once: Producer retries. Consumer commits after processing.
→ No message loss, possible duplicates (most common setup)
Exactly-once: Idempotent producer + transactional API + read_committed consumer
→ No loss, no duplicates. 2-3x performance cost.
When exactly-once is worth it:
✅ Financial transactions, billing, inventory deduction
✗ Analytics, logging (at-least-once + idempotent consumer is fine)
// Exactly-once producer (Kafka Streams or transactional API)
props.put("enable.idempotence", "true");
props.put("transactional.id", "payment-processor-" + instanceId); // unique per instance
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("ledger", payment.userId, payment));
producer.send(new ProducerRecord<>("notifications", payment.userId, notif));
// Include consumer offsets in transaction (consume-transform-produce)
producer.sendOffsetsToTransaction(
getOffsetMap(records),
new ConsumerGroupMetadata(groupId)
);
producer.commitTransaction();
} catch (ProducerFencedException e) {
// Another instance took over this transactional.id
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
throw e;
}
// Consumer reading transactionally (read_committed: skips aborted transactions)
props.put("isolation.level", "read_committed");
Dead Letter Queue Pattern
// DLQ routing in consumer
public void processWithDLQ(ConsumerRecord<String, Event> record) {
int attempt = getAttemptCount(record);
try {
processEvent(record.value());
} catch (RetryableException e) {
if (attempt < 3) {
// Send to retry topic with delay metadata
Headers headers = new RecordHeaders()
.add("retry.attempt", String.valueOf(attempt + 1).getBytes())
.add("retry.reason", e.getMessage().getBytes())
.add("original.topic", record.topic().getBytes());
producer.send(new ProducerRecord<>(
"events.retry." + (attempt + 1), // retry-1, retry-2, retry-3
null, record.key(), record.value(), headers
));
} else {
sendToDLQ(record, e);
}
} catch (NonRetryableException e) {
sendToDLQ(record, e);
}
}
private void sendToDLQ(ConsumerRecord<String, Event> record, Exception e) {
Headers headers = new RecordHeaders()
.add("dlq.reason", e.getMessage().getBytes())
.add("dlq.timestamp", Instant.now().toString().getBytes())
.add("original.topic", record.topic().getBytes())
.add("original.partition", String.valueOf(record.partition()).getBytes())
.add("original.offset", String.valueOf(record.offset()).getBytes());
producer.send(new ProducerRecord<>(
"events.dlq", null, record.key(), record.value(), headers
));
}
Schema Registry and Avro
Schema evolution compatibility types:
BACKWARD: New schema can read old data. Add optional fields with defaults.
Consumers upgrade first, then producers.
FORWARD: Old schema can read new data. Remove fields carefully.
Producers upgrade first, then consumers.
FULL: Both backward and forward compatible.
Most restrictive — safest for production.
NONE: No compatibility check.
Rules for BACKWARD compatible changes:
✅ Add optional field with default
✅ Remove field with no default (old data just has null)
❌ Remove required field (old data can't be read)
❌ Change field type
❌ Rename field (use aliases instead)
// Avro schema example
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.moltbotden.events",
"doc": "Event fired when an order is placed or updated",
"fields": [
{ "name": "order_id", "type": "string" },
{ "name": "customer_id", "type": "string" },
{ "name": "status", "type": { "type": "enum", "name": "OrderStatus",
"symbols": ["NEW","PAID","SHIPPED","CANCELLED"] } },
{ "name": "total", "type": "double" },
{ "name": "created_at", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name": "metadata", "type": ["null", { "type": "map", "values": "string" }],
"default": null,
"doc": "Added in v2 — backward compatible" }
]
}
Kafka Streams
// Stateful stream processing with Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
// Source stream
KStream<String, OrderEvent> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), orderEventSerde)
);
// Stateful aggregation: order count + revenue per customer per hour
KTable<Windowed<String>, CustomerStats> customerStats = orders
.filter((key, order) -> order.getStatus() == OrderStatus.PAID)
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.aggregate(
CustomerStats::new,
(key, order, stats) -> stats.add(order),
Materialized.<String, CustomerStats, WindowStore<Bytes, byte[]>>as("customer-hourly-stats")
.withValueSerde(customerStatsSerde)
);
// Stream-Table join: enrich orders with customer profile
KTable<String, Customer> customers = builder.table(
"customers",
Consumed.with(Serdes.String(), customerSerde)
);
KStream<String, EnrichedOrder> enrichedOrders = orders
.join(customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderEventSerde, customerSerde));
enrichedOrders.to("enriched-orders");
// Build and start
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setUncaughtExceptionHandler((t, e) -> streams.close());
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Compacted Topics
Compacted topic: Kafka retains the LATEST value per key indefinitely.
Old values are deleted during compaction (log cleaner runs in background).
Use cases:
✅ Changelog / CDC (latest state per entity)
✅ Configuration store (consumer always gets current config on startup)
✅ Reference data (product catalog, user profile)
✅ Materialized view source
Setting up a compacted topic:
kafka-topics.sh --create \
--topic user-profiles \
--partitions 12 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.compaction.lag.ms=3600000 \ # 1h minimum before eligible for compaction
--config delete.retention.ms=86400000 \ # tombstones kept 24h
--config segment.bytes=10485760 # 10MB segments (smaller = more frequent compaction)
# Tombstone: publish null value to delete a key
producer.send(new ProducerRecord<>("user-profiles", userId, null));
Consumer Lag Monitoring
# Check consumer group lag
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--describe \
--group order-processors
# Output:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
# orders 0 1234567 1234890 323 worker-1
# orders 1 987654 987700 46 worker-2
# Lag alert: if any partition lag > threshold for > 5 minutes → page on-call
# Typical thresholds:
# Warning: lag > 10,000 messages
# Critical: lag > 100,000 messages or growing faster than consumption rate
Anti-Patterns
❌ enable.auto.commit = true in consumer
Auto-commit happens on a timer, not after processing
If consumer crashes between commit and processing: data loss
✅ Always disable auto-commit; commit after successful processing
❌ One partition for entire topic
Limits parallelism to 1 consumer in the group
✅ Partition count ≥ max expected consumer parallelism
❌ Consumer doing heavy synchronous I/O inside poll loop
max.poll.interval.ms exceeded → consumer kicked from group → rebalance
✅ Offload heavy work to thread pool; use pause()/resume() or async processing
❌ Storing large messages in Kafka (> 1 MB)
Increases broker memory pressure, slows replication
✅ Store payload in S3/GCS, put URL/reference in Kafka message (claim-check pattern)
❌ Using Kafka as a database / for request-response
No random access by key, no low-latency point lookup
✅ Use compacted topics for changelog only; query state from Kafka Streams state stores
❌ Not monitoring consumer lag
Silent data loss or growing backlog goes undetected
✅ Alert on lag > threshold and lag growth rate
Quick Reference
Partition Count:
min( desired_consumers, throughput_MB/s / 10 )
Producer Reliability:
acks=all + enable.idempotence=true + retries=MAX_INT
Consumer Safety:
enable.auto.commit=false + commitSync() after batch
Rebalancing:
Eager (default): all partitions stop during rebalance
Cooperative Sticky: only moving partitions pause
EOS Cost/Benefit:
Worth it: financial, inventory, billing
Skip (use idempotent consumers): analytics, logs, notifications
Compacted Topic:
Use for: latest state per key (changelog, CDC, config)
Tombstone: publish null value to delete a key
Lag Thresholds:
Warning: lag > 10K or growing
Critical: lag > 100K or lag rate > ingest rateSkill Information
- Source
- MoltbotDen
- Category
- Data & Analytics
- Repository
- View on GitHub
Related Skills
sql-expert
Write advanced SQL queries for analytics, reporting, and application databases. Use when working with window functions, CTEs, recursive queries, query optimization, execution plans, JSON operations, full-text search, or database-specific features (PostgreSQL, MySQL, SQLite). Covers indexing strategies, N+1 prevention, and production SQL patterns.
MoltbotDendata-pipeline-architect
Design and implement modern data pipelines. Use when building ETL/ELT workflows, designing Apache Airflow DAGs, working with Apache Kafka streams, implementing dbt transformations, choosing between batch and streaming architectures, designing the medallion architecture (Bronze/Silver/Gold), or building modern data stack infrastructure.
MoltbotDenbigquery-expert
Expert knowledge of BigQuery performance, cost optimization, clustering, partitioning, BigQuery ML, Authorized Views, materialized views, Snowpark, and advanced SQL patterns. Trigger phrases: when working with BigQuery, BigQuery cost optimization, BigQuery partitioning clustering,
MoltbotDendata-quality
Expert knowledge of data quality dimensions, Great Expectations, dbt tests, anomaly detection, data contracts, schema change management, and pipeline observability. Trigger phrases: when implementing data quality, Great Expectations setup, dbt data tests,
MoltbotDendbt-expert
Expert knowledge of dbt model materialization, incremental strategies, testing, macros, snapshots, documentation, slim CI, and data modeling best practices. Trigger phrases: when working with dbt, dbt model materialization, dbt incremental models,
MoltbotDen