I learned about resilience the hard way—at 3am, staring at logs of a multi-agent system that had been running fine for weeks until it suddenly wasn't. One agent crashed. Then another. Then the whole system collapsed like dominoes. That night taught me more about building resilient systems than any textbook ever did.
This article is what I wish I'd known before that production incident.
Why Resilience Matters More Than You Think
Single-agent systems fail gracefully. They crash, you restart them, life goes on. Multi-agent systems fail catastrophically. One agent's failure cascades through the system, triggering failures in agents that depend on it, which trigger failures in agents that depend on them.
The brutal truth: If your multi-agent system hasn't failed spectacularly yet, you haven't run it long enough.
Resilience isn't about preventing failures—that's impossible. It's about containing them, recovering quickly, and maintaining partial functionality when things go sideways.
Architecture Patterns: Choosing Your Foundation
Hierarchical Architecture
Like a corporate org chart. Manager agents delegate to worker agents. Clear command structure, easy to reason about.
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from enum import Enum
import asyncio
import uuid
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
id: str
type: str
payload: Dict[str, Any]
status: TaskStatus = TaskStatus.PENDING
assigned_to: Optional[str] = None
result: Optional[Any] = None
error: Optional[str] = None
class WorkerAgent:
def __init__(self, agent_id: str, capabilities: List[str]):
self.id = agent_id
self.capabilities = capabilities
self.current_task: Optional[Task] = None
self.is_healthy = True
async def execute(self, task: Task) -> Task:
"""Execute a task and return result"""
self.current_task = task
task.status = TaskStatus.IN_PROGRESS
task.assigned_to = self.id
try:
# Simulate work
await asyncio.sleep(0.5)
if not self.is_healthy:
raise Exception(f"Agent {self.id} is unhealthy")
task.result = f"Processed by {self.id}"
task.status = TaskStatus.COMPLETED
except Exception as e:
task.error = str(e)
task.status = TaskStatus.FAILED
finally:
self.current_task = None
return task
def can_handle(self, task_type: str) -> bool:
return task_type in self.capabilities
class ManagerAgent:
def __init__(self, manager_id: str):
self.id = manager_id
self.workers: Dict[str, WorkerAgent] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.completed_tasks: List[Task] = []
self.failed_tasks: List[Task] = []
def add_worker(self, worker: WorkerAgent):
"""Add a worker to this manager's pool"""
self.workers[worker.id] = worker
async def submit_task(self, task_type: str, payload: Dict[str, Any]) -> Task:
"""Submit a task for execution"""
task = Task(
id=str(uuid.uuid4()),
type=task_type,
payload=payload
)
await self.task_queue.put(task)
return task
async def process_tasks(self):
"""Main task processing loop"""
while True:
try:
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
except asyncio.TimeoutError:
continue
# Find capable worker
worker = self.find_worker(task.type)
if not worker:
task.status = TaskStatus.FAILED
task.error = "No capable worker available"
self.failed_tasks.append(task)
continue
# Execute task
try:
result = await worker.execute(task)
if result.status == TaskStatus.COMPLETED:
self.completed_tasks.append(result)
else:
self.failed_tasks.append(result)
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
self.failed_tasks.append(task)
def find_worker(self, task_type: str) -> Optional[WorkerAgent]:
"""Find an available, healthy worker for this task type"""
for worker in self.workers.values():
if (worker.can_handle(task_type) and
worker.is_healthy and
worker.current_task is None):
return worker
return None
def get_stats(self) -> Dict[str, int]:
return {
"completed": len(self.completed_tasks),
"failed": len(self.failed_tasks),
"workers": len(self.workers),
"healthy_workers": sum(1 for w in self.workers.values() if w.is_healthy)
}
Pros: Clear responsibility, easy to debug, natural work distribution.
Cons: Manager becomes a bottleneck, single point of failure if manager dies.
When I use it: When tasks have clear ownership and hierarchy makes sense (data processing pipelines, research workflows).
Flat Architecture
Peer-to-peer. All agents are equal. They coordinate through shared state or message passing.
import asyncio
from typing import Set, Dict
import json
class PeerAgent:
def __init__(self, agent_id: str, peers: Set[str]):
self.id = agent_id
self.peers = peers
self.state: Dict[str, Any] = {}
self.message_queue: asyncio.Queue = asyncio.Queue()
async def broadcast(self, message_type: str, data: Any):
"""Broadcast message to all peers"""
message = {
"from": self.id,
"type": message_type,
"data": data
}
# In real implementation, send over network/message bus
for peer_id in self.peers:
await self.send_to_peer(peer_id, message)
async def send_to_peer(self, peer_id: str, message: Dict):
"""Send message to specific peer"""
# Simulated network send
print(f"{self.id} -> {peer_id}: {message['type']}")
async def receive_messages(self):
"""Process incoming messages"""
while True:
try:
message = await asyncio.wait_for(
self.message_queue.get(),
timeout=1.0
)
await self.handle_message(message)
except asyncio.TimeoutError:
continue
async def handle_message(self, message: Dict):
"""Handle incoming message"""
msg_type = message.get("type")
data = message.get("data")
if msg_type == "state_update":
self.state.update(data)
elif msg_type == "task_offer":
await self.evaluate_task_offer(data)
async def evaluate_task_offer(self, task_data: Dict):
"""Decide whether to accept a task offer"""
# Simple logic: accept if not busy
if not self.state.get("busy", False):
await self.broadcast("task_accept", {"task": task_data})
Pros: No single point of failure, scales horizontally, agents are autonomous.
Cons: Coordination is harder, potential for conflicts, complexity in debugging.
When I use it: Distributed systems, agent swarms, when no natural hierarchy exists.
Hybrid Architecture
The pragmatic choice. Hierarchy for structure, peer communication for efficiency.
class HybridAgent:
"""Agent that can act as both manager and peer"""
def __init__(self, agent_id: str, role: str = "worker"):
self.id = agent_id
self.role = role # "manager", "worker", or "coordinator"
self.manager_id: Optional[str] = None
self.peers: Set[str] = set()
self.workers: Dict[str, 'HybridAgent'] = {}
async def route_message(self, message: Dict) -> str:
"""Route message based on architecture"""
msg_type = message.get("type")
# Management messages go up hierarchy
if msg_type in ["escalate", "report"]:
if self.manager_id:
return self.manager_id
# Coordination messages go to peers
elif msg_type in ["sync", "coordinate"]:
# Broadcast to peers
return "broadcast_peers"
# Task messages go down hierarchy
elif msg_type in ["assign_task", "delegate"]:
# Send to appropriate worker
return self.select_worker(message)
return "unknown"
This is what I use in production most often. Managers handle task distribution, but workers can talk directly to each other for coordination without going through the manager.
Communication Topologies
Star Topology
Everything goes through a central hub. Simple but the hub is a bottleneck.
class MessageHub:
"""Central message routing hub"""
def __init__(self):
self.agents: Dict[str, 'Agent'] = {}
self.message_log: List[Dict] = []
async def route(self, from_agent: str, to_agent: str, message: Dict):
"""Route message from one agent to another"""
self.message_log.append({
"from": from_agent,
"to": to_agent,
"message": message,
"timestamp": asyncio.get_event_loop().time()
})
target = self.agents.get(to_agent)
if target:
await target.receive(message)
async def broadcast(self, from_agent: str, message: Dict):
"""Broadcast to all agents except sender"""
for agent_id, agent in self.agents.items():
if agent_id != from_agent:
await agent.receive(message)
When to use: Small systems (<10 agents), when you need central logging/monitoring, prototyping.
Mesh Topology
Agents talk directly to each other. Scales better but harder to monitor.
class MeshAgent:
def __init__(self, agent_id: str):
self.id = agent_id
self.connections: Dict[str, 'MeshAgent'] = {}
def connect_to(self, other: 'MeshAgent'):
"""Establish bidirectional connection"""
self.connections[other.id] = other
other.connections[self.id] = self
async def send_direct(self, target_id: str, message: Dict):
"""Send message directly to peer"""
target = self.connections.get(target_id)
if target:
await target.receive(message)
else:
raise Exception(f"No connection to {target_id}")
When to use: Large systems, when agents need low-latency communication, distributed deployments.
Pub/Sub Topology
My favorite for production. Agents publish to topics, subscribe to topics they care about. Decoupled and scalable.
from collections import defaultdict
class PubSubBroker:
def __init__(self):
self.subscriptions: Dict[str, Set[str]] = defaultdict(set)
self.agents: Dict[str, 'PubSubAgent'] = {}
def subscribe(self, agent_id: str, topic: str):
"""Agent subscribes to topic"""
self.subscriptions[topic].add(agent_id)
def unsubscribe(self, agent_id: str, topic: str):
"""Agent unsubscribes from topic"""
self.subscriptions[topic].discard(agent_id)
async def publish(self, topic: str, message: Dict):
"""Publish message to all subscribers"""
subscribers = self.subscriptions.get(topic, set())
# Deliver to all subscribers
await asyncio.gather(*[
self.deliver(agent_id, topic, message)
for agent_id in subscribers
])
async def deliver(self, agent_id: str, topic: str, message: Dict):
"""Deliver message to specific agent"""
agent = self.agents.get(agent_id)
if agent:
await agent.on_message(topic, message)
class PubSubAgent:
def __init__(self, agent_id: str, broker: PubSubBroker):
self.id = agent_id
self.broker = broker
broker.agents[agent_id] = self
def subscribe(self, topic: str):
self.broker.subscribe(self.id, topic)
async def publish(self, topic: str, data: Any):
await self.broker.publish(topic, {
"from": self.id,
"data": data
})
async def on_message(self, topic: str, message: Dict):
"""Handle received message"""
print(f"{self.id} received on {topic}: {message}")
When to use: Always. Unless you have a really good reason not to.
State Management
Shared Memory (Fast but Fragile)
from threading import Lock
class SharedMemory:
def __init__(self):
self._data: Dict[str, Any] = {}
self._lock = Lock()
def read(self, key: str) -> Any:
with self._lock:
return self._data.get(key)
def write(self, key: str, value: Any):
with self._lock:
self._data[key] = value
def transaction(self, updates: Dict[str, Any]):
"""Atomic update of multiple keys"""
with self._lock:
self._data.update(updates)
Pros: Fast, simple for single-process systems.
Cons: Doesn't scale across processes/machines, prone to race conditions.
Message Passing (Reliable but Slower)
class MessagePassingState:
"""State managed through messages"""
def __init__(self):
self.state: Dict[str, Any] = {}
self.inbox: asyncio.Queue = asyncio.Queue()
async def handle_messages(self):
"""Process state update messages"""
while True:
msg = await self.inbox.get()
if msg["type"] == "read":
# Send current value back
key = msg["key"]
await msg["reply_queue"].put(self.state.get(key))
elif msg["type"] == "write":
# Update state
self.state[msg["key"]] = msg["value"]
async def read(self, key: str) -> Any:
reply_queue = asyncio.Queue()
await self.inbox.put({
"type": "read",
"key": key,
"reply_queue": reply_queue
})
return await reply_queue.get()
async def write(self, key: str, value: Any):
await self.inbox.put({
"type": "write",
"key": key,
"value": value
})
Pros: Works across processes, explicit state changes.
Cons: Slower than shared memory, more complex.
Event Sourcing (My Pick for Production)
Store events, not state. Rebuild state by replaying events. Gives you complete audit trail and time-travel debugging.
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
@dataclass
class Event:
type: str
data: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
agent_id: str = ""
class EventStore:
def __init__(self):
self.events: List[Event] = []
def append(self, event: Event):
"""Append event to store"""
self.events.append(event)
def get_events(self, agent_id: Optional[str] = None) -> List[Event]:
"""Get events, optionally filtered by agent"""
if agent_id:
return [e for e in self.events if e.agent_id == agent_id]
return self.events
def replay(self, initial_state: Dict[str, Any]) -> Dict[str, Any]:
"""Rebuild state by replaying all events"""
state = initial_state.copy()
for event in self.events:
if event.type == "state_updated":
state.update(event.data)
elif event.type == "task_completed":
state.setdefault("completed_tasks", []).append(event.data)
return state
class EventSourcedAgent:
def __init__(self, agent_id: str, event_store: EventStore):
self.id = agent_id
self.event_store = event_store
self._cache: Dict[str, Any] = {}
async def update_state(self, key: str, value: Any):
"""Update state by appending event"""
event = Event(
type="state_updated",
data={key: value},
agent_id=self.id
)
self.event_store.append(event)
self._cache[key] = value
def get_current_state(self) -> Dict[str, Any]:
"""Rebuild state from events"""
my_events = self.event_store.get_events(self.id)
state = {}
for event in my_events:
if event.type == "state_updated":
state.update(event.data)
return state
Fault Tolerance: The Survival Kit
Circuit Breakers
I showed this in the coordination article, but it's worth emphasizing. Circuit breakers prevent one failing agent from taking down the whole system.
import time
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 60.0
):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
async def execute(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function through circuit breaker"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.timeout:
print("Circuit breaker moving to HALF_OPEN")
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
"""Handle successful call"""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
print("Circuit breaker closing after successful tests")
self.state = CircuitState.CLOSED
self.success_count = 0
def on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
self.success_count = 0
if self.failure_count >= self.failure_threshold:
print(f"Circuit breaker opening after {self.failure_count} failures")
self.state = CircuitState.OPEN
Retry with Exponential Backoff
Don't retry immediately. Wait progressively longer between attempts.
import asyncio
import random
class RetryPolicy:
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
async def execute(self, func: Callable, *args, **kwargs) -> Any:
"""Execute with retry logic"""
last_exception = None
for attempt in range(self.max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_attempts - 1:
delay = self.calculate_delay(attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
await asyncio.sleep(delay)
# All attempts failed
raise Exception(f"Failed after {self.max_attempts} attempts") from last_exception
def calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter"""
delay = min(
self.base_delay * (self.exponential_base ** attempt),
self.max_delay
)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
Graceful Degradation
When an agent fails, the system should degrade functionality, not crash entirely.
class ResilientSystem:
def __init__(self):
self.primary_agent: Optional[Agent] = None
self.fallback_agent: Optional[Agent] = None
self.degraded_mode = False
async def process_request(self, request: Dict) -> Dict:
"""Process request with fallback"""
# Try primary
if self.primary_agent and not self.degraded_mode:
try:
return await self.primary_agent.handle(request)
except Exception as e:
print(f"Primary agent failed: {e}")
self.degraded_mode = True
# Fallback to secondary
if self.fallback_agent:
try:
return await self.fallback_agent.handle_simple(request)
except Exception as e:
print(f"Fallback agent also failed: {e}")
# Last resort: return cached or default response
return {"status": "degraded", "message": "Limited functionality available"}
Observability: Seeing What's Happening
You can't fix what you can't see. Observability is non-negotiable.
Distributed Tracing
Track requests across agent boundaries.
import uuid
from contextvars import ContextVar
# Context variable to track trace ID across async calls
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')
class Tracer:
def __init__(self):
self.spans: List[Dict] = []
def start_span(self, operation: str, agent_id: str) -> str:
"""Start a new span"""
span_id = str(uuid.uuid4())[:8]
trace_id = trace_id_var.get() or str(uuid.uuid4())[:8]
trace_id_var.set(trace_id)
span = {
"span_id": span_id,
"trace_id": trace_id,
"operation": operation,
"agent_id": agent_id,
"start_time": time.time(),
"end_time": None,
"status": "started"
}
self.spans.append(span)
return span_id
def end_span(self, span_id: str, status: str = "completed"):
"""End a span"""
for span in self.spans:
if span["span_id"] == span_id:
span["end_time"] = time.time()
span["status"] = status
span["duration"] = span["end_time"] - span["start_time"]
break
def get_trace(self, trace_id: str) -> List[Dict]:
"""Get all spans for a trace"""
return [s for s in self.spans if s["trace_id"] == trace_id]
# Global tracer
tracer = Tracer()
class TracedAgent:
def __init__(self, agent_id: str):
self.id = agent_id
async def handle_request(self, request: Dict) -> Dict:
"""Handle request with tracing"""
span_id = tracer.start_span("handle_request", self.id)
try:
# Do work
await asyncio.sleep(0.1)
result = {"status": "ok"}
tracer.end_span(span_id, "completed")
return result
except Exception as e:
tracer.end_span(span_id, "failed")
raise e
Health Checks
Every agent should expose health status.
from enum import Enum
import time
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class HealthCheck:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.status = HealthStatus.HEALTHY
self.last_check = time.time()
self.failure_count = 0
self.metrics: Dict[str, Any] = {}
async def check(self) -> Dict[str, Any]:
"""Perform health check"""
self.last_check = time.time()
checks = {
"memory": await self.check_memory(),
"connectivity": await self.check_connectivity(),
"dependencies": await self.check_dependencies()
}
# Determine overall status
if all(checks.values()):
self.status = HealthStatus.HEALTHY
self.failure_count = 0
elif any(checks.values()):
self.status = HealthStatus.DEGRADED
else:
self.status = HealthStatus.UNHEALTHY
self.failure_count += 1
return {
"agent_id": self.agent_id,
"status": self.status.value,
"checks": checks,
"metrics": self.metrics,
"last_check": self.last_check
}
async def check_memory(self) -> bool:
"""Check if agent has enough memory"""
# Simplified - would check actual memory usage
return True
async def check_connectivity(self) -> bool:
"""Check if agent can reach dependencies"""
return True
async def check_dependencies(self) -> bool:
"""Check if required services are available"""
return True
Security: Trust No One
Agent Authentication
import hmac
import hashlib
from datetime import datetime, timedelta
class AgentAuth:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.valid_tokens: Dict[str, datetime] = {}
def generate_token(self, agent_id: str) -> str:
"""Generate authentication token for agent"""
timestamp = int(time.time())
message = f"{agent_id}:{timestamp}"
signature = hmac.new(
self.secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
token = f"{message}:{signature}"
# Store token with expiry
self.valid_tokens[token] = datetime.now() + timedelta(hours=1)
return token
def verify_token(self, token: str) -> Optional[str]:
"""Verify token and return agent_id if valid"""
try:
message, signature = token.rsplit(':', 1)
agent_id, timestamp = message.split(':')
# Verify signature
expected_signature = hmac.new(
self.secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return None
# Check expiry
if token in self.valid_tokens:
if datetime.now() > self.valid_tokens[token]:
del self.valid_tokens[token]
return None
return agent_id
except Exception:
return None
class SecureAgent:
def __init__(self, agent_id: str, auth: AgentAuth):
self.id = agent_id
self.auth = auth
self.token = auth.generate_token(agent_id)
async def send_authenticated_message(self, target: 'SecureAgent', message: Dict):
"""Send authenticated message"""
signed_message = {
"token": self.token,
"from": self.id,
"payload": message
}
await target.receive_authenticated(signed_message)
async def receive_authenticated(self, signed_message: Dict):
"""Receive and verify authenticated message"""
token = signed_message.get("token")
sender_id = self.auth.verify_token(token)
if not sender_id:
print(f"Rejecting message: invalid token")
return
if sender_id != signed_message.get("from"):
print(f"Rejecting message: sender mismatch")
return
# Process verified message
await self.handle_message(signed_message["payload"])
async def handle_message(self, message: Dict):
print(f"{self.id} received verified message: {message}")
Testing Multi-Agent Systems
Deterministic Replay
Record all messages, replay them to reproduce bugs.
import json
class MessageRecorder:
def __init__(self, log_file: str = "messages.jsonl"):
self.log_file = log_file
self.recording = True
def record(self, message: Dict):
"""Record a message"""
if not self.recording:
return
with open(self.log_file, 'a') as f:
f.write(json.dumps(message) + '\n')
def replay(self) -> List[Dict]:
"""Replay recorded messages"""
messages = []
with open(self.log_file, 'r') as f:
for line in f:
messages.append(json.loads(line))
return messages
Chaos Testing
Intentionally inject failures to test resilience.
import random
class ChaosMonkey:
def __init__(self, failure_rate: float = 0.1):
self.failure_rate = failure_rate
self.enabled = False
async def maybe_fail(self, operation: str):
"""Randomly inject failure"""
if not self.enabled:
return
if random.random() < self.failure_rate:
raise Exception(f"Chaos monkey killed: {operation}")
async def maybe_delay(self, operation: str, max_delay: float = 5.0):
"""Randomly inject delay"""
if not self.enabled:
return
if random.random() < self.failure_rate:
delay = random.uniform(0, max_delay)
print(f"Chaos monkey delaying {operation} by {delay:.2f}s")
await asyncio.sleep(delay)
class ChaosTestedAgent:
def __init__(self, agent_id: str, chaos: ChaosMonkey):
self.id = agent_id
self.chaos = chaos
async def process(self, task: Dict) -> Dict:
"""Process with chaos injection"""
await self.chaos.maybe_fail("process")
await self.chaos.maybe_delay("process")
# Actual work
return {"status": "completed"}
Performance Optimization
Batching
class BatchProcessor:
def __init__(self, batch_size: int = 10, max_wait: float = 1.0):
self.batch_size = batch_size
self.max_wait = max_wait
self.queue: asyncio.Queue = asyncio.Queue()
async def process_batches(self, handler: Callable):
"""Process items in batches"""
while True:
batch = []
deadline = asyncio.get_event_loop().time() + self.max_wait
# Collect items until batch size or timeout
while len(batch) < self.batch_size:
timeout = max(0, deadline - asyncio.get_event_loop().time())
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=timeout
)
batch.append(item)
except asyncio.TimeoutError:
break
if batch:
await handler(batch)
Connection Pooling
from typing import Optional
import asyncio
class ConnectionPool:
def __init__(self, max_connections: int = 10):
self.max_connections = max_connections
self.available: asyncio.Queue = asyncio.Queue(maxsize=max_connections)
self.in_use: Set[Any] = set()
async def get_connection(self) -> Any:
"""Get connection from pool"""
try:
# Try to get existing connection
conn = self.available.get_nowait()
except asyncio.QueueEmpty:
# Create new if under limit
if len(self.in_use) < self.max_connections:
conn = await self.create_connection()
else:
# Wait for available connection
conn = await self.available.get()
self.in_use.add(conn)
return conn
async def return_connection(self, conn: Any):
"""Return connection to pool"""
self.in_use.discard(conn)
await self.available.put(conn)
async def create_connection(self) -> Any:
"""Create new connection"""
# Simulated connection creation
await asyncio.sleep(0.1)
return {"id": len(self.in_use)}
Anti-Patterns to Avoid
Complete Mini System with Health Checks
Here's a complete example putting it all together:
import asyncio
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
import uuid
# --- Health Check System ---
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthReport:
agent_id: str
status: HealthStatus
last_heartbeat: float
active_tasks: int
error_count: int
# --- Message Bus ---
class MessageBus:
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, topic: str, callback: Callable):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)
async def publish(self, topic: str, message: Dict):
if topic in self.subscribers:
await asyncio.gather(*[
callback(message)
for callback in self.subscribers[topic]
])
# --- Resilient Agent ---
class ResilientAgent:
def __init__(self, agent_id: str, bus: MessageBus):
self.id = agent_id
self.bus = bus
self.status = HealthStatus.HEALTHY
self.last_heartbeat = time.time()
self.active_tasks = 0
self.error_count = 0
self.circuit_breaker = CircuitBreaker()
# Subscribe to health check requests
bus.subscribe("health_check", self.on_health_check)
async def start(self):
"""Start agent background tasks"""
asyncio.create_task(self.heartbeat_loop())
asyncio.create_task(self.work_loop())
async def heartbeat_loop(self):
"""Send periodic heartbeats"""
while True:
self.last_heartbeat = time.time()
await self.bus.publish("heartbeat", {
"agent_id": self.id,
"timestamp": self.last_heartbeat,
"status": self.status.value
})
await asyncio.sleep(5)
async def work_loop(self):
"""Main work loop"""
while True:
try:
await self.do_work()
except Exception as e:
self.error_count += 1
print(f"{self.id} error: {e}")
await asyncio.sleep(1)
async def do_work(self):
"""Simulate work with circuit breaker"""
self.active_tasks += 1
try:
await self.circuit_breaker.execute(self.risky_operation)
finally:
self.active_tasks -= 1
async def risky_operation(self):
"""Simulated risky operation"""
await asyncio.sleep(0.1)
# Randomly fail to test circuit breaker
import random
if random.random() < 0.1:
raise Exception("Random failure")
async def on_health_check(self, message: Dict):
"""Respond to health check"""
report = HealthReport(
agent_id=self.id,
status=self.status,
last_heartbeat=self.last_heartbeat,
active_tasks=self.active_tasks,
error_count=self.error_count
)
await self.bus.publish("health_report", {
"agent_id": self.id,
"report": report.__dict__
})
# --- Health Monitor ---
class HealthMonitor:
def __init__(self, bus: MessageBus):
self.bus = bus
self.agents: Dict[str, HealthReport] = {}
self.bus.subscribe("heartbeat", self.on_heartbeat)
self.bus.subscribe("health_report", self.on_health_report)
async def monitor_loop(self):
"""Monitor all agents"""
while True:
# Request health checks
await self.bus.publish("health_check", {})
# Check for stale agents
current_time = time.time()
for agent_id, report in list(self.agents.items()):
if current_time - report.last_heartbeat > 30:
print(f"⚠️ Agent {agent_id} is not responding!")
report.status = HealthStatus.UNHEALTHY
# Print status
self.print_status()
await asyncio.sleep(10)
async def on_heartbeat(self, message: Dict):
"""Record heartbeat"""
agent_id = message["agent_id"]
if agent_id not in self.agents:
self.agents[agent_id] = HealthReport(
agent_id=agent_id,
status=HealthStatus.HEALTHY,
last_heartbeat=message["timestamp"],
active_tasks=0,
error_count=0
)
else:
self.agents[agent_id].last_heartbeat = message["timestamp"]
async def on_health_report(self, message: Dict):
"""Update health report"""
report_data = message["report"]
self.agents[report_data["agent_id"]] = HealthReport(**report_data)
def print_status(self):
"""Print system status"""
print("\n=== System Health ===")
for agent_id, report in self.agents.items():
age = time.time() - report.last_heartbeat
print(f"{agent_id}: {report.status.value} | "
f"tasks={report.active_tasks} | "
f"errors={report.error_count} | "
f"heartbeat={age:.1f}s ago")
# --- Main System ---
async def run_system():
"""Run complete multi-agent system"""
bus = MessageBus()
# Create agents
agents = [
ResilientAgent(f"agent-{i}", bus)
for i in range(3)
]
# Start agents
for agent in agents:
await agent.start()
# Start monitor
monitor = HealthMonitor(bus)
asyncio.create_task(monitor.monitor_loop())
# Run for 60 seconds
await asyncio.sleep(60)
# Run it
if __name__ == "__main__":
asyncio.run(run_system())
Key Takeaways
- Architecture matters - Choose hierarchical for structure, flat for autonomy, hybrid for pragmatism
- Pub/sub wins - Best topology for most production systems
- Event sourcing for state - Complete audit trail and time-travel debugging
- Circuit breakers are mandatory - Prevent cascade failures
- Observability is not optional - Distributed tracing, health checks, metrics
- Security from day one - Agent authentication, message signing, trust boundaries
- Test with chaos - Inject failures to find weaknesses before production does
- Performance through batching - Don't send one message when you can batch ten
- Avoid anti-patterns - No god agents, no circular delegation, no chatty agents
FAQ
Q: What's the most common cause of multi-agent system failures?
A: Cascade failures. One agent crashes, agents depending on it fail, their dependents fail, and soon your whole system is down. Circuit breakers and graceful degradation are essential.
Q: How do I debug a multi-agent system when behavior is non-deterministic?
A: Distributed tracing with correlation IDs. Record every message with a trace ID that flows through the system. When you find a bug, search logs for that trace ID and see the complete request flow.
Q: Should I use synchronous or asynchronous communication between agents?
A: Asynchronous by default. Synchronous (request-response) is fine for critical paths with tight latency requirements, but async (pub/sub) scales better and is more resilient.
Q: How many agents is too many for one system?
A: Depends on coordination complexity. I've run systems with 100+ agents using hierarchical organization. If your agents need to coordinate with every other agent, you'll hit scaling limits around 10-20. Use hierarchies or partition into independent subsystems.
Q: What's the difference between resilience and fault tolerance?
A: Fault tolerance means the system continues operating correctly despite failures. Resilience means the system can recover and adapt. Fault-tolerant systems prevent failures from causing problems. Resilient systems survive failures and get stronger. You want both.