azure-eventhub-dotnet
Azure Event Hubs SDK for .NET. Use for high-throughput event streaming: sending events (EventHubProducerClient, EventHubBufferedProducerClient), receiving events (EventProcessorClient with checkpointing), partition management, and real-time data ingestion. Triggers: "Event Hubs", "event streaming", "EventHubProducerClient", "EventProcessorClient", "send events", "receive events", "checkpointing", "partition".
Azure.Messaging.EventHubs (.NET)
High-throughput event streaming SDK for sending and receiving events via Azure Event Hubs.
Installation
# Core package (sending and simple receiving)
dotnet add package Azure.Messaging.EventHubs
# Processor package (production receiving with checkpointing)
dotnet add package Azure.Messaging.EventHubs.Processor
# Authentication
dotnet add package Azure.Identity
# For checkpointing (required by EventProcessorClient)
dotnet add package Azure.Storage.Blobs
Current Versions: Azure.Messaging.EventHubs v5.12.2, Azure.Messaging.EventHubs.Processor v5.12.2
Environment Variables
EVENTHUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=<event-hub-name>
# For checkpointing (EventProcessorClient)
BLOB_STORAGE_CONNECTION_STRING=<storage-connection-string>
BLOB_CONTAINER_NAME=<checkpoint-container>
# Alternative: Connection string auth (not recommended for production)
EVENTHUB_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...
Authentication
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
// Always use DefaultAzureCredential for production
var credential = new DefaultAzureCredential();
var fullyQualifiedNamespace = Environment.GetEnvironmentVariable("EVENTHUB_FULLY_QUALIFIED_NAMESPACE");
var eventHubName = Environment.GetEnvironmentVariable("EVENTHUB_NAME");
var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
credential);
Required RBAC Roles:
- Sending:
Azure Event Hubs Data Sender - Receiving:
Azure Event Hubs Data Receiver - Both:
Azure Event Hubs Data Owner
Client Types
| Client | Purpose | When to Use |
EventHubProducerClient | Send events immediately in batches | Real-time sending, full control over batching |
EventHubBufferedProducerClient | Automatic batching with background sending | High-volume, fire-and-forget scenarios |
EventHubConsumerClient | Simple event reading | Prototyping only, NOT for production |
EventProcessorClient | Production event processing | Always use this for receiving in production |
Core Workflow
1. Send Events (Batch)
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
await using var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
// Create a batch (respects size limits automatically)
using EventDataBatch batch = await producer.CreateBatchAsync();
// Add events to batch
var events = new[]
{
new EventData(BinaryData.FromString("{\"id\": 1, \"message\": \"Hello\"}")),
new EventData(BinaryData.FromString("{\"id\": 2, \"message\": \"World\"}"))
};
foreach (var eventData in events)
{
if (!batch.TryAdd(eventData))
{
// Batch is full - send it and create a new one
await producer.SendAsync(batch);
batch = await producer.CreateBatchAsync();
if (!batch.TryAdd(eventData))
{
throw new Exception("Event too large for empty batch");
}
}
}
// Send remaining events
if (batch.Count > 0)
{
await producer.SendAsync(batch);
}
2. Send Events (Buffered - High Volume)
using Azure.Messaging.EventHubs.Producer;
var options = new EventHubBufferedProducerClientOptions
{
MaximumWaitTime = TimeSpan.FromSeconds(1)
};
await using var producer = new EventHubBufferedProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential(),
options);
// Handle send success/failure
producer.SendEventBatchSucceededAsync += args =>
{
Console.WriteLine($"Batch sent: {args.EventBatch.Count} events");
return Task.CompletedTask;
};
producer.SendEventBatchFailedAsync += args =>
{
Console.WriteLine($"Batch failed: {args.Exception.Message}");
return Task.CompletedTask;
};
// Enqueue events (sent automatically in background)
for (int i = 0; i < 1000; i++)
{
await producer.EnqueueEventAsync(new EventData($"Event {i}"));
}
// Flush remaining events before disposing
await producer.FlushAsync();
3. Receive Events (Production - EventProcessorClient)
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
// Blob container for checkpointing
var blobClient = new BlobContainerClient(
Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING"),
Environment.GetEnvironmentVariable("BLOB_CONTAINER_NAME"));
await blobClient.CreateIfNotExistsAsync();
// Create processor
var processor = new EventProcessorClient(
blobClient,
EventHubConsumerClient.DefaultConsumerGroup,
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
// Handle events
processor.ProcessEventAsync += async args =>
{
Console.WriteLine($"Partition: {args.Partition.PartitionId}");
Console.WriteLine($"Data: {args.Data.EventBody}");
// Checkpoint after processing (or batch checkpoints)
await args.UpdateCheckpointAsync();
};
// Handle errors
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error: {args.Exception.Message}");
Console.WriteLine($"Partition: {args.PartitionId}");
return Task.CompletedTask;
};
// Start processing
await processor.StartProcessingAsync();
// Run until cancelled
await Task.Delay(Timeout.Infinite, cancellationToken);
// Stop gracefully
await processor.StopProcessingAsync();
4. Partition Operations
// Get partition IDs
string[] partitionIds = await producer.GetPartitionIdsAsync();
// Send to specific partition (use sparingly)
var options = new SendEventOptions
{
PartitionId = "0"
};
await producer.SendAsync(events, options);
// Use partition key (recommended for ordering)
var batchOptions = new CreateBatchOptions
{
PartitionKey = "customer-123" // Events with same key go to same partition
};
using var batch = await producer.CreateBatchAsync(batchOptions);
EventPosition Options
Control where to start reading:
// Start from beginning
EventPosition.Earliest
// Start from end (new events only)
EventPosition.Latest
// Start from specific offset
EventPosition.FromOffset(12345)
// Start from specific sequence number
EventPosition.FromSequenceNumber(100)
// Start from specific time
EventPosition.FromEnqueuedTime(DateTimeOffset.UtcNow.AddHours(-1))
ASP.NET Core Integration
// Program.cs
using Azure.Identity;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Extensions.Azure;
builder.Services.AddAzureClients(clientBuilder =>
{
clientBuilder.AddEventHubProducerClient(
builder.Configuration["EventHub:FullyQualifiedNamespace"],
builder.Configuration["EventHub:Name"]);
clientBuilder.UseCredential(new DefaultAzureCredential());
});
// Inject in controller/service
public class EventService
{
private readonly EventHubProducerClient _producer;
public EventService(EventHubProducerClient producer)
{
_producer = producer;
}
public async Task SendAsync(string message)
{
using var batch = await _producer.CreateBatchAsync();
batch.TryAdd(new EventData(message));
await _producer.SendAsync(batch);
}
}
Best Practices
- Use
EventProcessorClientfor receiving — Never useEventHubConsumerClientin production - Checkpoint strategically — After N events or time interval, not every event
- Use partition keys — For ordering guarantees within a partition
- Reuse clients — Create once, use as singleton (thread-safe)
- Use
await using— Ensures proper disposal - Handle
ProcessErrorAsync— Always register error handler - Batch events — Use
CreateBatchAsync()to respect size limits - Use buffered producer — For high-volume scenarios with automatic batching
Error Handling
using Azure.Messaging.EventHubs;
try
{
await producer.SendAsync(batch);
}
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ServiceBusy)
{
// Retry with backoff
await Task.Delay(TimeSpan.FromSeconds(5));
}
catch (EventHubsException ex) when (ex.IsTransient)
{
// Transient error - safe to retry
Console.WriteLine($"Transient error: {ex.Message}");
}
catch (EventHubsException ex)
{
// Non-transient error
Console.WriteLine($"Error: {ex.Reason} - {ex.Message}");
}
Checkpointing Strategies
| Strategy | When to Use |
| Every event | Low volume, critical data |
| Every N events | Balanced throughput/reliability |
| Time-based | Consistent checkpoint intervals |
| Batch completion | After processing a logical batch |
// Checkpoint every 100 events
private int _eventCount = 0;
processor.ProcessEventAsync += async args =>
{
// Process event...
_eventCount++;
if (_eventCount >= 100)
{
await args.UpdateCheckpointAsync();
_eventCount = 0;
}
};
Related SDKs
| SDK | Purpose | Install |
Azure.Messaging.EventHubs | Core sending/receiving | dotnet add package Azure.Messaging.EventHubs |
Azure.Messaging.EventHubs.Processor | Production processing | dotnet add package Azure.Messaging.EventHubs.Processor |
Azure.ResourceManager.EventHubs | Management plane (create hubs) | dotnet add package Azure.ResourceManager.EventHubs |
Microsoft.Azure.WebJobs.Extensions.EventHubs | Azure Functions binding | dotnet add package Microsoft.Azure.WebJobs.Extensions.EventHubs |
Skill Information
- Source
- Microsoft
- Category
- Cloud & Azure
- Repository
- View on GitHub
Related Skills
agent-framework-azure-ai-py
Build Azure AI Foundry agents using the Microsoft Agent Framework Python SDK (agent-framework-azure-ai). Use when creating persistent agents with AzureAIAgentsProvider, using hosted tools (code interpreter, file search, web search), integrating MCP servers, managing conversation threads, or implementing streaming responses. Covers function tools, structured outputs, and multi-tool agents.
Microsoftazd-deployment
Deploy containerized applications to Azure Container Apps using Azure Developer CLI (azd). Use when setting up azd projects, writing azure.yaml configuration, creating Bicep infrastructure for Container Apps, configuring remote builds with ACR, implementing idempotent deployments, managing environment variables across local/.azure/Bicep, or troubleshooting azd up failures. Triggers on requests for azd configuration, Container Apps deployment, multi-service deployments, and infrastructure-as-code with Bicep.
Microsoftazure-ai-agents-persistent-dotnet
Azure AI Agents Persistent SDK for .NET. Low-level SDK for creating and managing AI agents with threads, messages, runs, and tools. Use for agent CRUD, conversation threads, streaming responses, function calling, file search, and code interpreter. Triggers: "PersistentAgentsClient", "persistent agents", "agent threads", "agent runs", "streaming agents", "function calling agents .NET".
Microsoftazure-ai-agents-persistent-java
Azure AI Agents Persistent SDK for Java. Low-level SDK for creating and managing AI agents with threads, messages, runs, and tools. Triggers: "PersistentAgentsClient", "persistent agents java", "agent threads java", "agent runs java", "streaming agents java".
Microsoftazure-ai-anomalydetector-java
Build anomaly detection applications with Azure AI Anomaly Detector SDK for Java. Use when implementing univariate/multivariate anomaly detection, time-series analysis, or AI-powered monitoring.
Microsoft