Azure Queue Storage SDK for Python: Simple, Reliable Message Queuing
When you need to decouple application components, distribute tasks across workers, or implement background processing, message queues are your answer. Azure Queue Storage provides a simple, cost-effective queuing service that's perfect for asynchronous communication patterns and can handle millions of messages.
What This Skill Does
The azure-storage-queue-py skill provides a Python SDK for Azure Queue Storage, a message queue service built on Azure Storage. Unlike more complex message brokers, Queue Storage is refreshingly simple: send messages, receive messages, process them, delete them. That's the entire workflow.
The SDK offers two client types: QueueServiceClient for account-level operations like creating and listing queues, and QueueClient for queue-specific operations like sending and receiving messages. Messages can be text or JSON, support visibility timeouts (hide messages while being processed), have configurable time-to-live values, and provide automatic retry with exponential backoff.
The service handles message deduplication, provides at-least-once delivery guarantees, supports peeking at messages without removing them, and enables message updates (extend processing time or change content). It's designed for simplicity and cost-effectiveness, making it ideal for task queues and work distribution.
Getting Started
Install the Queue Storage SDK:
pip install azure-storage-queue azure-identity
Set your storage account URL:
export AZURE_STORAGE_ACCOUNT_URL="https://mystorageaccount.queue.core.windows.net"
Here's the basic pattern—send a message, receive it, process it, delete it:
from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueServiceClient
# Connect to storage account
credential = DefaultAzureCredential()
account_url = "https://mystorageaccount.queue.core.windows.net"
service = QueueServiceClient(account_url=account_url, credential=credential)
# Create queue
service.create_queue("tasks")
# Get queue client
queue_client = service.get_queue_client("tasks")
# Send message
queue_client.send_message("Process user signup for user123")
# Receive and process message
messages = queue_client.receive_messages()
for message in messages:
print(f"Processing: {message.content}")
# Do work...
# Delete message after successful processing
queue_client.delete_message(message)
print("Message processed and deleted")
That's the fundamental workflow. Messages become invisible when you receive them (so other workers don't grab the same message), you process them, then you delete them. If you don't delete within the visibility timeout, the message becomes visible again for retry.
Key Features
Visibility Timeout: When you receive a message, it becomes invisible to other consumers for a configurable period (default 30 seconds). This prevents multiple workers from processing the same message. If processing fails and you don't delete the message, it automatically becomes visible again.
Time-to-Live: Set how long messages remain in the queue before automatic deletion. This prevents stale messages from accumulating. Default is 7 days; you can set up to unlimited.
Dequeue Count: Each message tracks how many times it's been dequeued. Use this to detect poison messages (messages that repeatedly fail processing) and move them to a dead-letter queue.
Peek Operations: View messages without removing them or affecting visibility. Perfect for monitoring queue depth and content without disrupting workers.
Message Updates: Extend visibility timeout if processing takes longer than expected, or update message content during processing. This enables flexible workflow patterns.
Batch Operations: Send and receive multiple messages in a single operation to improve throughput and reduce API calls.
Async Support: A complete async API for high-performance worker applications that process messages concurrently.
Usage Examples
Send Messages with Options:
import json
from datetime import timedelta
queue_client = service.get_queue_client("tasks")
# Send simple text message
queue_client.send_message("Simple task")
# Send with visibility timeout (message invisible for 60 seconds)
queue_client.send_message(
"Delayed task",
visibility_timeout=60
)
# Send with time-to-live (expires in 1 hour)
queue_client.send_message(
"Expiring task",
time_to_live=3600
)
# Send JSON data
task_data = {
"task_type": "email",
"user_id": 12345,
"template": "welcome"
}
queue_client.send_message(json.dumps(task_data))
Process Messages with Error Handling:
import time
def process_task(message_content):
"""Simulate task processing."""
print(f"Processing: {message_content}")
time.sleep(2)
# Return True on success, False on failure
return True
# Worker loop
while True:
messages = queue_client.receive_messages(
messages_per_page=10,
visibility_timeout=30 # 30 seconds to process
)
for message in messages:
try:
# Parse JSON if needed
data = json.loads(message.content)
# Process the task
success = process_task(message.content)
if success:
# Delete on success
queue_client.delete_message(message)
print(f"✓ Message {message.id} processed successfully")
else:
# Check dequeue count for poison messages
if message.dequeue_count > 5:
print(f"⚠️ Poison message detected (tried {message.dequeue_count} times)")
# Move to dead-letter queue or log
queue_client.delete_message(message)
else:
# Let it retry (becomes visible again after timeout)
print(f"✗ Processing failed, will retry")
except Exception as e:
print(f"Error processing message: {e}")
# Message becomes visible again after timeout for retry
# Sleep before next poll
time.sleep(5)
Extend Visibility Timeout for Long Processing:
messages = queue_client.receive_messages(visibility_timeout=30)
for message in messages:
# Start processing
print(f"Processing {message.content}...")
# Realize we need more time
queue_client.update_message(
message,
visibility_timeout=60 # Extend to 60 more seconds
)
# Continue processing...
time.sleep(45)
# Delete when done
queue_client.delete_message(message)
Peek at Queue Without Processing:
# Monitor queue depth without affecting messages
messages = queue_client.peek_messages(max_messages=10)
print(f"Queue status:")
for message in messages:
print(f" - {message.content}")
# Get queue properties for total count
properties = queue_client.get_queue_properties()
print(f"Approximate message count: {properties.approximate_message_count}")
Async Worker for High Throughput:
from azure.storage.queue.aio import QueueClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def process_message_async(message_content):
"""Async task processing."""
await asyncio.sleep(1) # Simulate async work
return True
async def worker():
credential = DefaultAzureCredential()
async with QueueClient(
account_url="https://mystorageaccount.queue.core.windows.net",
queue_name="tasks",
credential=credential
) as client:
while True:
# Receive messages
async for message in client.receive_messages(messages_per_page=32):
# Process concurrently
success = await process_message_async(message.content)
if success:
await client.delete_message(message)
await asyncio.sleep(1)
# Run multiple workers concurrently
asyncio.run(asyncio.gather(worker(), worker(), worker()))
Handle Binary Data:
from azure.storage.queue import BinaryBase64EncodePolicy, BinaryBase64DecodePolicy
# Create client with binary encoding
queue_client = QueueClient(
account_url=account_url,
queue_name="binary-queue",
credential=credential,
message_encode_policy=BinaryBase64EncodePolicy(),
message_decode_policy=BinaryBase64DecodePolicy()
)
# Send binary data
binary_data = b"\x89PNG\r\n\x1a\n..." # Image bytes
queue_client.send_message(binary_data)
# Receive binary data
messages = queue_client.receive_messages()
for message in messages:
# message.content is already decoded to bytes
with open("received-image.png", "wb") as f:
f.write(message.content)
queue_client.delete_message(message)
Best Practices
Always Delete Successfully Processed Messages: This is critical. If you don't delete messages after processing, they'll become visible again and be reprocessed. Set up proper try/except blocks to ensure deletion on success.
Set Appropriate Visibility Timeouts: Match the timeout to your actual processing time. Too short and messages become visible while still being processed (duplicate work). Too long and failed messages take forever to retry.
Handle Poison Messages: Check message.dequeue_count and move messages to a dead-letter queue after a threshold (e.g., 5 attempts). This prevents infinite retry loops on permanently broken messages.
Use Time-to-Live for Temporary Tasks: Set time_to_live to automatically clean up stale messages. For example, set 1-hour TTL for time-sensitive notifications that become irrelevant if not processed quickly.
Peek for Monitoring: Use peek_messages() to check queue health without affecting message visibility. This is perfect for dashboards and monitoring systems.
Use Async for High Volume: If you're processing thousands of messages per minute, use the async client with multiple workers. This dramatically improves throughput.
Consider Service Bus for Complex Needs: Queue Storage is simple and cheap, but lacks features like topics, sessions, and guaranteed ordering. If you need those, use Azure Service Bus instead.
Monitor Queue Depth: Set up Azure Monitor alerts for queue depth. Growing queues indicate workers aren't keeping up with message production.
When to Use This Skill
Perfect for:
- Task distribution across multiple workers
- Background job processing (email sending, image processing)
- Decoupling microservices for resilience
- Work queues for long-running operations
- Simple pub-sub patterns (one queue per subscriber)
- Cost-effective message queuing at scale
- Buffering requests during traffic spikes
Use Service Bus instead for:
- Guaranteed FIFO ordering within sessions
- Topics and subscriptions (true pub-sub)
- Message sessions for stateful processing
- Dead-letter queues with automatic retry policies
- Complex routing and filtering
- Transactions across multiple queues
Use Event Grid instead for:
- Event-driven architectures
- Reacting to Azure service events
- Low-latency event delivery
- Fan-out to many subscribers
Queue Storage is the simplest, cheapest message queue in Azure. Use it when simplicity and cost matter more than advanced features.
Related Skills
Explore the full Azure Queue Storage SDK skill: /ai-assistant/azure-storage-queue-py
Source
This skill is provided by Microsoft as part of the Azure SDK for Python (package: azure-storage-queue).
Need reliable message queuing without the complexity? Azure Queue Storage delivers simplicity and cost-effectiveness at scale.