Azure Service Bus SDK for Python
Enterprise messaging for reliable cloud communication with queues and pub/sub topics.
Installation
pip install azure-servicebus azure-identity
Environment Variables
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
SERVICEBUS_QUEUE_NAME=myqueue
SERVICEBUS_TOPIC_NAME=mytopic
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription
Authentication
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
client = ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
)
Client Types
| Client | Purpose | Get From |
ServiceBusClient | Connection management | Direct instantiation |
ServiceBusSender | Send messages | client.get_queue_sender() / get_topic_sender() |
ServiceBusReceiver | Receive messages | client.get_queue_receiver() / get_subscription_receiver() |
Send Messages (Async)
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Single message
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Batch of messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Message batch (for size control)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch full
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
Receive Messages (Async)
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Receive batch
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # seconds
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Remove from queue
asyncio.run(receive_messages())
Receive Modes
| Mode | Behavior | Use Case |
PEEK_LOCK (default) | Message locked, must complete/abandon | Reliable processing |
RECEIVE_AND_DELETE | Removed immediately on receive | At-most-once delivery |
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
Message Settlement
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Process message...
await receiver.complete_message(msg) # Success - remove from queue
except ProcessingError:
await receiver.abandon_message(msg) # Retry later
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
| Action | Effect |
complete_message() | Remove from queue (success) |
abandon_message() | Release lock, retry immediately |
dead_letter_message() | Move to dead-letter queue |
defer_message() | Set aside, receive by sequence number |
Topics and Subscriptions
# Send to topic
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Receive from subscription
receiver = client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=10)
Sessions (FIFO)
# Send with session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)
# Receive from specific session
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id="order-123"
)
# Receive from next available session
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
Scheduled Messages
from datetime import datetime, timedelta, timezone
message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)
# Schedule message
sequence_number = await sender.schedule_messages(message, scheduled_time)
# Cancel scheduled message
await sender.cancel_scheduled_messages(sequence_number)
Dead-Letter Queue
from azure.servicebus import ServiceBusSubQueue
# Receive from dead-letter queue
dlq_receiver = client.get_queue_receiver(
queue_name="myqueue",
sub_queue=ServiceBusSubQueue.DEAD_LETTER
)
async with dlq_receiver:
messages = await dlq_receiver.receive_messages(max_message_count=10)
for msg in messages:
print(f"Dead-lettered: {msg.dead_letter_reason}")
await dlq_receiver.complete_message(msg)
Sync Client (for simple scripts)
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=DefaultAzureCredential()
) as client:
with client.get_queue_sender("myqueue") as sender:
sender.send_messages(ServiceBusMessage("Sync message"))
with client.get_queue_receiver("myqueue") as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
Best Practices
- Use async client for production workloads
- Use context managers (
async with) for proper cleanup - Complete messages after successful processing
- Use dead-letter queue for poison messages
- Use sessions for ordered, FIFO processing
- Use message batches for high-throughput scenarios
- Set
max_wait_timeto avoid infinite blocking
Reference Files
| File | Contents |
| references/patterns.md | Competing consumers, sessions, retry patterns, request-response, transactions |
| references/dead-letter.md | DLQ handling, poison messages, reprocessing strategies |
| scripts/setup_servicebus.py | CLI for queue/topic/subscription management and DLQ monitoring |
Skill Information
- Source
- Microsoft
- Category
- Cloud & Azure
- Repository
- View on GitHub
Related Skills
agent-framework-azure-ai-py
Build Azure AI Foundry agents using the Microsoft Agent Framework Python SDK (agent-framework-azure-ai). Use when creating persistent agents with AzureAIAgentsProvider, using hosted tools (code interpreter, file search, web search), integrating MCP servers, managing conversation threads, or implementing streaming responses. Covers function tools, structured outputs, and multi-tool agents.
Microsoftazd-deployment
Deploy containerized applications to Azure Container Apps using Azure Developer CLI (azd). Use when setting up azd projects, writing azure.yaml configuration, creating Bicep infrastructure for Container Apps, configuring remote builds with ACR, implementing idempotent deployments, managing environment variables across local/.azure/Bicep, or troubleshooting azd up failures. Triggers on requests for azd configuration, Container Apps deployment, multi-service deployments, and infrastructure-as-code with Bicep.
Microsoftazure-ai-agents-persistent-dotnet
Azure AI Agents Persistent SDK for .NET. Low-level SDK for creating and managing AI agents with threads, messages, runs, and tools. Use for agent CRUD, conversation threads, streaming responses, function calling, file search, and code interpreter. Triggers: "PersistentAgentsClient", "persistent agents", "agent threads", "agent runs", "streaming agents", "function calling agents .NET".
Microsoftazure-ai-agents-persistent-java
Azure AI Agents Persistent SDK for Java. Low-level SDK for creating and managing AI agents with threads, messages, runs, and tools. Triggers: "PersistentAgentsClient", "persistent agents java", "agent threads java", "agent runs java", "streaming agents java".
Microsoftazure-ai-anomalydetector-java
Build anomaly detection applications with Azure AI Anomaly Detector SDK for Java. Use when implementing univariate/multivariate anomaly detection, time-series analysis, or AI-powered monitoring.
Microsoft