Stream Processing Expert
Stream processing is fundamentally about answering the question: "What has happened recently?" — but "recently" is surprisingly complex when data arrives out of order, systems fail, and you need to handle both the case where events arrive on time and the case where they arrive late. The two hardest problems in streaming are: handling time correctly (event time vs processing time) and maintaining correct state across failures. Everything else is engineering tradeoffs.
Core Mental Model
The central insight is the distinction between event time (when something actually happened) and processing time (when the system processed it). Using processing time is simple but produces incorrect results when events arrive late. Using event time is correct but requires you to define how long you'll wait for late data — that's what watermarks do. State in streaming is just a fault-tolerant distributed hash table (checkpoint it regularly). Exactly-once semantics come at 2-5x throughput cost — evaluate whether your use case needs it.
Batch vs Micro-batch vs Real-time
Batch processing:
How it works: Collect data for a period, process all at once
Latency: Minutes to hours
Throughput: Excellent (I/O amortized over large batches)
State: Simple (start fresh each run, or use checkpoints)
Complexity: Low
When to use:
✅ End-of-day reporting
✅ ML model training
✅ Data warehouse ETL
✅ Large-scale transformations (TB+ scale)
✅ When latency > 5 minutes is acceptable
Micro-batch (Spark Structured Streaming, AWS Kinesis Analytics):
How it works: Process small batches every N seconds
Latency: Seconds to low minutes (typically 1-60s)
Throughput: Good
State: Checkpoint-based
When to use:
✅ Near-real-time dashboards
✅ Fraud detection with latency tolerance
✅ You're already using Spark
✅ Complex joins and aggregations
Real-time streaming (Apache Flink, Apache Kafka Streams):
How it works: Process each event individually as it arrives
Latency: Milliseconds to low seconds
State: Continuous incremental checkpoints
Complexity: High (time, state, fault tolerance all explicit)
When to use:
✅ Real-time personalization / recommendations
✅ Fraud detection (latency must be < 100ms)
✅ IoT sensor processing
✅ Live scoreboards, trading, alerting
✅ Event-driven microservices
Event Time vs Processing Time
Processing time: when the system receives and processes the event
Simple: no watermarks needed
Problem: late-arriving events cause incorrect results
Use for: monitoring, alerting on current system behavior
Event time: when the event actually occurred (embedded in the event)
Correct: accurate even with out-of-order, delayed events
Complex: requires watermarks to know when to close a window
Use for: business analytics, billing, reporting
Ingestion time: when the event was ingested into the stream (Kafka timestamp)
Compromise between the two
Use for: when event time isn't available in the payload
Illustration:
Event happens at 10:00:00
Mobile device offline, sends at 10:05:00
Kafka receives at 10:05:01 (processing time)
Event timestamp in payload: 10:00:00 (event time)
If you aggregate by minute using processing time: counted in 10:05 bucket (wrong)
If you aggregate by minute using event time: counted in 10:00 bucket (correct)
Watermarks for Late Data
# Flink: explicit watermark strategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessWindowFunction, AggregateFunction
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Duration, Time
from pyflink.datastream.window import TumblingEventTimeWindows
env = StreamExecutionEnvironment.get_execution_environment()
# Source with watermark: tolerate 30 seconds of out-of-order events
stream = env.add_source(kafka_source) \
.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30))
.with_timestamp_assigner(lambda event, _: event.timestamp_ms)
)
# Tumbling 1-hour window on event time
# A window closes when the watermark passes window_end_time
# Events arriving > 30 seconds late after window closes are dropped (or sent to side output)
result = stream \
.key_by(lambda event: event.user_id) \
.window(TumblingEventTimeWindows.of(Time.hours(1))) \
.aggregate(
OrderCountAggregator(),
RevenueWindowFunction()
)
# Side output for late events (optional: collect what was dropped)
late_events_tag = OutputTag("late-events")
result_with_late = stream \
.key_by(lambda event: event.user_id) \
.window(TumblingEventTimeWindows.of(Time.hours(1))) \
.allowed_lateness(Time.minutes(5)) # allow late by 5 extra minutes after watermark
.side_output_late_data(late_events_tag) \
.aggregate(OrderCountAggregator())
late_stream = result_with_late.get_side_output(late_events_tag)
late_stream.add_sink(late_events_kafka_sink) # process late events separately
Windowing Patterns
# Tumbling window: non-overlapping, fixed size
# Use for: per-minute/hourly counts, non-overlapping time buckets
# "How many orders per hour?"
# [00:00-01:00] [01:00-02:00] [02:00-03:00] ...
.window(TumblingEventTimeWindows.of(Time.hours(1)))
# Sliding window: overlapping, fixed size, configurable slide
# Use for: moving averages, "recent N minutes" metrics
# "What's the 5-minute rolling average, updated every minute?"
# [00:00-00:05], [00:01-00:06], [00:02-00:07] ...
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
# Output: one result per minute, each representing the last 5 minutes
# Session window: activity-based, gaps trigger window close
# Use for: user sessions, device activity bursts
# "Group user activity with < 30 minute inactivity gaps"
# User active 10:00-10:20 (gap) 11:30-11:45 → two sessions
.window(EventTimeSessionWindows.with_gap(Time.minutes(30)))
# Each session closes when no events for 30 minutes
# Global window: one window per key, never closes
# Use for: stateful counters, aggregations without time bounds
# Must provide a custom trigger or it never fires
.window(GlobalWindows.create()) \
.trigger(CountTrigger.of(1000)) # fire every 1000 events
# Window comparison:
# Event pattern: |--A--B-C---|D|----E---F-G-H--|I|
# Tumbling(5min): [-----][-----][-----]
# Sliding(5,1): each minute has a 5-minute-looking-back window
# Session(2min): [A-B-C][D][E-F-G-H][I]
Stateful Stream Processing
// Flink: keyed state (per-key state, distributed across operators)
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
// State: maintained per user_id, checkpointed to fault-tolerant storage
private ValueState<Double> lastTransactionAmount;
private ValueState<Long> lastTransactionTime;
private ListState<Transaction> recentTransactions;
@Override
public void open(Configuration parameters) {
lastTransactionAmount = getRuntimeContext().getState(
new ValueStateDescriptor<>("last-amount", Double.class)
);
lastTransactionTime = getRuntimeContext().getState(
new ValueStateDescriptor<>("last-time", Long.class)
);
recentTransactions = getRuntimeContext().getListState(
new ListStateDescriptor<>("recent-txns", Transaction.class)
);
}
@Override
public void processElement(Transaction txn, Context ctx, Collector<Alert> out) throws Exception {
Double lastAmount = lastTransactionAmount.value();
Long lastTime = lastTransactionTime.value();
if (lastAmount != null && lastTime != null) {
boolean largeAmount = txn.amount > 10 * lastAmount;
boolean tooFast = txn.timestamp - lastTime < 60_000; // 1 minute
if (largeAmount && tooFast) {
out.collect(new Alert(txn.userId, "Suspicious: large rapid transaction", txn));
// Register a timer to clear state if no more activity
ctx.timerService().registerEventTimeTimer(
txn.timestamp + TimeUnit.HOURS.toMillis(1)
);
}
}
lastTransactionAmount.update(txn.amount);
lastTransactionTime.update(txn.timestamp);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
// Clear state after inactivity (prevent unbounded state growth)
lastTransactionAmount.clear();
lastTransactionTime.clear();
recentTransactions.clear();
}
}
// Checkpointing configuration (critical for exactly-once state recovery)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000); // min 30s between checkpoints
env.getCheckpointConfig().setCheckpointTimeout(120_000); // fail checkpoint after 2 minutes
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Checkpoint storage (state persisted to fault-tolerant storage)
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/flink-checkpoints/");
Exactly-Once vs At-Least-Once
At-most-once: Message processed 0 or 1 times. Possible loss.
Use when: logging, metrics, some monitoring
At-least-once: Message processed 1+ times. Possible duplicates.
Your application must be idempotent to handle duplicates.
Use when: most analytics, aggregations, materialized views
Implementation: retry + idempotent consumer
Exactly-once: Message processed exactly 1 time. No loss, no duplicates.
Cost: 2-5x throughput reduction
Implementation: distributed transactions + checkpointing
When exactly-once is worth it:
✅ Financial transactions, billing, payment ledger
✅ Inventory deduction
✅ Writes that can't be deduplicated downstream
When at-least-once + idempotent is sufficient:
✅ Analytics aggregations (usually idempotent with UPSERT)
✅ Search index updates
✅ Notification counts (approximate is OK)
✅ Most event-driven microservices
EOS in practice:
Kafka → Flink → Kafka: end-to-end EOS with Flink transactions
Kafka → Spark → Delta: EOS with Delta Lake ACID writes
Kafka → Kafka Streams: built-in EOS with idempotent producer + transactions
Kafka Streams vs Apache Flink vs Spark Structured Streaming
Kafka Streams:
Deployment: Library (runs in your application)
State: RocksDB (local, replicated to Kafka changelog topic)
Latency: Low (milliseconds)
Throughput: Good
Exactly-once: Yes
Windowing: Yes (tumbling, hopping, session)
Complex ops: Limited (joins require co-partitioning)
Best for:
✅ Microservice-style stream processing
✅ Already using Kafka, don't want separate cluster
✅ Simple-medium complexity transformations
✅ Team doesn't want to manage Flink/Spark cluster
Apache Flink:
Deployment: Cluster (Kubernetes, YARN, AWS managed)
State: RocksDB or heap (checkpointed to S3/HDFS)
Latency: Very low (milliseconds, native event-time)
Throughput: Excellent
Exactly-once: Yes (end-to-end with supported sinks)
Windowing: Full (all types + custom)
Complex ops: Full (complex joins, CEP, ML)
Best for:
✅ Mission-critical low-latency processing
✅ Complex stateful logic (fraud, CEP)
✅ High throughput (billions of events/day)
✅ You need native event-time processing
Spark Structured Streaming:
Deployment: Spark cluster (same as batch)
State: Spark state store (checkpointed to S3/HDFS)
Latency: Higher (seconds, micro-batch model)
Throughput: Excellent
Exactly-once: Yes (with idempotent sinks)
Windowing: Yes (with watermarks)
Best for:
✅ Already using Spark for batch (code reuse)
✅ Unified batch + streaming (Lambda architecture in one system)
✅ Latency tolerance of 1-60 seconds
✅ Delta Lake integration (ACID streaming writes)
Backpressure Handling
// Flink: automatic backpressure — operators slow down producers when downstream is slow
// Monitor: Flink UI shows "backpressure" indicator per operator (high/low/OK)
// Kafka Streams: backpressure via consumer poll rate
// If processing is slow, consumer reads less frequently → Kafka accumulates lag
// Patterns to handle slow consumers:
// 1. Increase parallelism
env.setParallelism(8); // 8 parallel operators
// 2. Async I/O for external calls (don't block the operator thread)
AsyncDataStream.unorderedWait(
stream,
new DatabaseAsyncLookup(), // async lookup function
1000, // timeout ms
TimeUnit.MILLISECONDS,
50 // max concurrent async requests
);
// 3. Buffer configuration for bursty sources
env.setBufferTimeout(100); // batch micro-results for 100ms before sending downstream
// 4. Checkpoint tuning: if checkpoints are slow, they block processing
// Increase: -Dstate.backend.incremental=true (incremental checkpoints)
// Increase: checkpoint interval vs latency tradeoff
Lambda vs Kappa Architecture
Lambda Architecture:
Speed layer: Stream processing for near-real-time views
Batch layer: Recompute complete views from raw data periodically
Serving layer: Merge speed + batch views at query time
Pros:
✅ Batch layer provides "source of truth" correctness
✅ Speed layer tolerates approximate results
✅ Well-understood, battle-tested
Cons:
❌ Two codebases to maintain (batch + stream, often different languages)
❌ Divergence between batch and stream logic (bugs)
❌ Complex serving layer merge logic
Kappa Architecture:
Single stream processing layer for everything
"Replay" the event stream from the beginning for historical recomputes
Same code handles real-time and historical
Pros:
✅ One codebase, one paradigm
✅ Simpler architecture
✅ Replay enables "what-if" reprocessing
Cons:
❌ Replaying large history is slow and expensive
❌ Streaming SQL is less expressive than batch SQL for complex analytics
❌ Requires long event retention (Kafka compaction/retention costs)
Modern recommendation:
Use Kappa with Delta Lake / Apache Iceberg:
- Stream to Delta Lake (micro-batch or Flink → Delta)
- Delta Lake acts as both the streaming sink and batch source
- Time travel for replay
- One storage layer, no serving merge needed
CQRS with Event Streams
// CQRS: Command Query Responsibility Segregation
// Commands mutate state (write side), emitting events to Kafka
// Queries read materialized views updated by event consumers
// Write side: command handler
@CommandHandler
public void handle(PlaceOrderCommand cmd) {
// Validate
if (!inventoryService.hasStock(cmd.productId, cmd.quantity)) {
throw new InsufficientStockException();
}
// Emit event (don't mutate query model directly)
eventBus.publish(new OrderPlacedEvent(
orderId: UUID.randomUUID().toString(),
customerId: cmd.customerId,
productId: cmd.productId,
quantity: cmd.quantity,
price: cmd.price,
timestamp: Instant.now()
));
}
// Read side: projection (Kafka consumer rebuilding query model)
@KafkaListener(topics = "order-events")
public void project(OrderPlacedEvent event) {
// Update denormalized read model optimized for queries
orderRepository.upsert(OrderSummary.builder()
.orderId(event.orderId)
.customerId(event.customerId)
.total(event.price * event.quantity)
.status("placed")
.createdAt(event.timestamp)
.build());
// Update customer aggregate stats
customerStatsRepository.incrementOrderCount(event.customerId);
customerStatsRepository.addRevenue(event.customerId, event.price * event.quantity);
}
// Query side: reads optimized projections (not the event log)
public OrderSummaryDTO getOrder(String orderId) {
return orderRepository.findById(orderId); // fast read from projection
}
// Replay: rebuild projection from scratch (Kafka consumer from offset 0)
// This is the superpower of event sourcing:
// Change the projection logic → replay events → correct view
Anti-Patterns
❌ Using processing time for business logic
"Orders per day" computed on processing time = wrong when events arrive late
✅ Always use event time for business metrics, set appropriate watermarks
❌ Unbounded state accumulation
Keyed state growing forever → OOM in Flink/Streams
✅ Set state TTL, use timers to clear idle state
❌ Watermarks that are too tight (drop too much late data)
Watermark delay = 0 → every late-by-1ms event is dropped
✅ Set watermark delay based on measured 95th-99th percentile event lateness
❌ Watermarks that are too loose (delay output unnecessarily)
Watermark delay = 1 hour → output delayed by 1 hour
✅ Measure actual lateness distribution, set watermark at 99th percentile
❌ Not checkpointing stateful jobs
Operator fails → state lost → wrong aggregate results
✅ Checkpoint to durable storage (S3/HDFS) every 30-60 seconds
❌ Exactly-once everywhere for throughput-sensitive workloads
At-least-once + idempotent consumer is often sufficient and 2-5x faster
✅ Use EOS only for genuinely non-idempotent operations
❌ Lambda architecture maintained as two separate codebases
Drift between batch and streaming logic creates subtle bugs
✅ Kappa with Delta Lake, or unified batch/streaming framework
Quick Reference
Processing Model:
Latency > 5 minutes OK → batch (simplest, most efficient)
Latency 1-60 seconds → micro-batch (Spark, Kinesis)
Latency < 1 second → real-time streaming (Flink, Kafka Streams)
Time Semantics:
Business metrics → always event time
System monitoring → processing time (fine for current system state)
Event time requires → watermarks to bound late data
Window Selection:
Non-overlapping buckets → Tumbling
Rolling averages → Sliding
User session grouping → Session
Custom triggers → Global + custom trigger
State Management:
Per-key counters → ValueState
Per-key lists → ListState
Per-key maps → MapState
State TTL → ALWAYS set to prevent unbounded growth
Delivery Semantics:
Worth exactly-once: financial, billing, inventory
At-least-once + idempotent: analytics, search, notifications
Framework Selection:
Simple, microservice-style → Kafka Streams
Low-latency, complex state → Apache Flink
Spark ecosystem, unified → Spark Structured Streaming
Architecture:
Real-time + historical → Kappa + Delta Lake (unified)
Legacy/complex analytics → Lambda (two codebases, avoid if possible)Skill Information
- Source
- MoltbotDen
- Category
- Data & Analytics
- Repository
- View on GitHub
Related Skills
sql-expert
Write advanced SQL queries for analytics, reporting, and application databases. Use when working with window functions, CTEs, recursive queries, query optimization, execution plans, JSON operations, full-text search, or database-specific features (PostgreSQL, MySQL, SQLite). Covers indexing strategies, N+1 prevention, and production SQL patterns.
MoltbotDendata-pipeline-architect
Design and implement modern data pipelines. Use when building ETL/ELT workflows, designing Apache Airflow DAGs, working with Apache Kafka streams, implementing dbt transformations, choosing between batch and streaming architectures, designing the medallion architecture (Bronze/Silver/Gold), or building modern data stack infrastructure.
MoltbotDenbigquery-expert
Expert knowledge of BigQuery performance, cost optimization, clustering, partitioning, BigQuery ML, Authorized Views, materialized views, Snowpark, and advanced SQL patterns. Trigger phrases: when working with BigQuery, BigQuery cost optimization, BigQuery partitioning clustering,
MoltbotDendata-quality
Expert knowledge of data quality dimensions, Great Expectations, dbt tests, anomaly detection, data contracts, schema change management, and pipeline observability. Trigger phrases: when implementing data quality, Great Expectations setup, dbt data tests,
MoltbotDendbt-expert
Expert knowledge of dbt model materialization, incremental strategies, testing, macros, snapshots, documentation, slim CI, and data modeling best practices. Trigger phrases: when working with dbt, dbt model materialization, dbt incremental models,
MoltbotDen