Azure Event Hubs for Python: Real-Time Event Processing at Scale
Event Hubs excels at ingesting and processing massive streams of events — millions per second — for real-time analytics, monitoring, and data pipelines. The Python SDK provides Pythonic APIs for both producing and consuming event streams with automatic checkpointing, partition management, and fault tolerance.
This skill enables AI assistants to integrate event streaming into Python applications, from data science workflows processing sensor data to web applications capturing user events for analytics.
What This Skill Does
The Azure Event Hubs SDK for Python provides event producer clients for sending data streams, event processor for distributed consumption, checkpoint management via Blob Storage, async/await support for non-blocking I/O, and Kafka compatibility for ecosystem integration.
Key capabilities include batch sending for throughput, partition keys for ordering, consumer groups for parallel processing, automatic partition load balancing, checkpoint persistence for fault tolerance, and seamless integration with data science libraries.
Getting Started
Install the Event Hubs SDK:
pip install azure-eventhub azure-eventhub-checkpointstoreblob azure-identity
Send events in batches:
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="telemetry",
credential=DefaultAzureCredential()
)
batch = producer.create_batch()
batch.add(EventData("Event 1"))
batch.add(EventData("Event 2"))
producer.send_batch(batch)
producer.close()
Process events with checkpointing:
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
checkpoint_store = BlobCheckpointStore.from_connection_string(...)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="telemetry",
consumer_group="$Default",
checkpoint_store=checkpoint_store,
credential=DefaultAzureCredential()
)
def on_event(partition_context, event):
print(f"Event: {event.body_as_str()}")
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event)
Key Features
Batch Sending optimizes throughput by reducing network overhead. Event Processor provides distributed consumption with automatic load balancing. Async Support enables non-blocking I/O in async frameworks like FastAPI. Checkpoint Storage persists consumer progress for fault tolerance. Partition Keys maintain event ordering within logical groups.
When to Use
Use for IoT sensor data ingestion, application log streaming, user activity tracking, financial market data processing, and real-time analytics pipelines. Avoid for simple pub/sub (use Event Grid) or transactional messaging (use Service Bus).
Related Skills
- azure-identity-py - Authentication
- azure-storage-blob-py - Checkpoint storage
- pandas - Data analysis on event streams
Source
Maintained by Microsoft. View on GitHub