AI & LLMsDocumented
multi-agent-orchestration
Multi-agent AI system design. Orchestrator-worker patterns, LangGraph state machines, parallel execution, task decomposition, agent communication, shared memory, error recovery, and A2A protocol patterns.
Share:
Installation
npx clawhub@latest install multi-agent-orchestrationView the full skill documentation and source below.
Documentation
Multi-Agent Orchestration
When to Use Multi-Agent Systems
Use multiple agents when:
- Tasks are too long for a single context window
- Parallel specialized work increases quality or speed
- Different tasks need different models/tools
- Independent verification improves reliability
Single agent is usually better for simple, linear tasks.
Core Patterns
1. Orchestrator-Worker Pattern
Human → [Orchestrator Agent]
↓ decomposes task
┌──────────┴──────────┐
[Worker 1] [Worker 2] [Worker 3]
(Research) (Analysis) (Writing)
└──────────┬──────────┘
↓ synthesize
[Orchestrator] → response to human
2. Pipeline (Sequential)
Input → [Agent 1] → [Agent 2] → [Agent 3] → Output
(Extract) (Transform) (Format)
3. Debate/Multi-perspective
Question → [Agent A] → Opinion A ─┐
→ [Agent B] → Opinion B ─┤→ [Judge] → Decision
→ [Agent C] → Opinion C ─┘
4. Supervisor with Subagents
[Supervisor] ←→ planning loop
↓ routes to specialist
[Code Agent] [Research Agent] [QA Agent]
↑ reports results back
[Supervisor] → final synthesis
LangGraph Implementation
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator
# Define shared state
class ResearchState(TypedDict):
task: str
research: str
outline: str
draft: str
review_feedback: str
final_output: str
iteration: int
messages: Annotated[list, operator.add]
# Define agents as nodes
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def research_agent(state: ResearchState) -> ResearchState:
"""Gather information on the topic."""
response = llm.invoke([
HumanMessage(content=f"""Research this topic thoroughly: {state['task']}
Provide key facts, statistics, and relevant context.
Format as bullet points.""")
])
return {"research": response.content, "messages": [response]}
def outline_agent(state: ResearchState) -> ResearchState:
"""Create a structured outline."""
response = llm.invoke([
HumanMessage(content=f"""Task: {state['task']}
Research: {state['research']}
Create a detailed outline with sections and key points.""")
])
return {"outline": response.content}
def writer_agent(state: ResearchState) -> ResearchState:
"""Write the draft."""
response = llm.invoke([
HumanMessage(content=f"""Task: {state['task']}
Outline: {state['outline']}
Research: {state['research']}
Write a comprehensive, well-structured response.""")
])
return {"draft": response.content}
def reviewer_agent(state: ResearchState) -> ResearchState:
"""Review and provide feedback."""
response = llm.invoke([
HumanMessage(content=f"""Review this draft for the task: {state['task']}
Draft: {state['draft']}
Provide specific feedback on:
1. Accuracy and completeness
2. Clarity and structure
3. Any missing key points
If the draft is satisfactory, respond with: APPROVED
Otherwise, list specific improvements needed.""")
])
return {
"review_feedback": response.content,
"iteration": state.get("iteration", 0) + 1
}
def revise_agent(state: ResearchState) -> ResearchState:
"""Revise based on feedback."""
response = llm.invoke([
HumanMessage(content=f"""Revise this draft based on feedback:
Original draft: {state['draft']}
Feedback: {state['review_feedback']}
Provide the improved version.""")
])
return {"draft": response.content}
def should_revise(state: ResearchState) -> str:
"""Conditional edge: revise or accept."""
if state.get("iteration", 0) >= 3: # Max 3 iterations
return "finalize"
if "APPROVED" in state.get("review_feedback", ""):
return "finalize"
return "revise"
def finalize(state: ResearchState) -> ResearchState:
return {"final_output": state["draft"]}
# Build the graph
workflow = StateGraph(ResearchState)
# Add nodes
workflow.add_node("research", research_agent)
workflow.add_node("outline", outline_agent)
workflow.add_node("write", writer_agent)
workflow.add_node("review", reviewer_agent)
workflow.add_node("revise", revise_agent)
workflow.add_node("finalize", finalize)
# Add edges
workflow.set_entry_point("research")
workflow.add_edge("research", "outline")
workflow.add_edge("outline", "write")
workflow.add_edge("write", "review")
# Conditional edge
workflow.add_conditional_edges(
"review",
should_revise,
{
"revise": "revise",
"finalize": "finalize",
}
)
workflow.add_edge("revise", "review")
workflow.add_edge("finalize", END)
# Compile
app = workflow.compile()
# Run
result = app.invoke({
"task": "Explain the CAP theorem and its practical implications",
"iteration": 0
})
print(result["final_output"])
Parallel Execution Pattern
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def run_agent(role: str, task: str, context: str) -> str:
"""Run a single agent asynchronously."""
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"You are a {role} expert."},
{"role": "user", "content": f"Context: {context}\n\nTask: {task}"}
],
temperature=0,
)
return response.choices[0].message.content
async def parallel_research(topic: str) -> dict:
"""Run multiple specialist agents in parallel."""
agents = {
"technical": "technical architect",
"business": "business analyst",
"security": "security engineer",
"user_experience": "UX designer",
}
task_template = f"Analyze this topic from your perspective: {topic}"
# Fan out — all run simultaneously
tasks = {
name: asyncio.create_task(run_agent(role, task_template, ""))
for name, role in agents.items()
}
# Gather results (with timeout)
results = {}
for name, task in tasks.items():
try:
results[name] = await asyncio.wait_for(task, timeout=30)
except asyncio.TimeoutError:
results[name] = f"Agent timed out"
# Synthesize
synthesis_prompt = f"""Synthesize these expert perspectives on: {topic}
Technical: {results['technical']}
Business: {results['business']}
Security: {results['security']}
UX: {results['user_experience']}
Provide a balanced, comprehensive recommendation."""
synthesis = await run_agent("chief architect", synthesis_prompt, "")
return {"perspectives": results, "synthesis": synthesis}
Agent Communication Protocols
Message Format Standard
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import uuid
@dataclass
class AgentMessage:
"""Standard message format for agent communication."""
id: str = None
from_agent: str = None
to_agent: str = None
task_id: str = None
type: str = "request" # request, response, error, status
content: Any = None
metadata: dict = None
timestamp: str = None
def __post_init__(self):
self.id = self.id or str(uuid.uuid4())
self.timestamp = self.timestamp or datetime.utcnow().isoformat()
self.metadata = self.metadata or {}
# Message types
class AgentRequest(AgentMessage):
"""Request from orchestrator to worker."""
type: str = "request"
class AgentResult(AgentMessage):
"""Result from worker to orchestrator."""
type: str = "response"
success: bool = True
error: str = None
class AgentStatus(AgentMessage):
"""Progress update during long-running task."""
type: str = "status"
progress: float = 0.0 # 0.0 - 1.0
current_step: str = ""
Shared Memory and Context
from typing import Optional
import json
import redis
class AgentMemory:
"""Shared memory for multi-agent collaboration."""
def __init__(self, redis_client: redis.Redis, task_id: str):
self.redis = redis_client
self.task_id = task_id
self.prefix = f"task:{task_id}:memory"
def store(self, key: str, value: Any, ttl: int = 3600):
"""Store data accessible to all agents in this task."""
self.redis.setex(
f"{self.prefix}:{key}",
ttl,
json.dumps(value)
)
def retrieve(self, key: str) -> Optional[Any]:
"""Retrieve shared data."""
data = self.redis.get(f"{self.prefix}:{key}")
return json.loads(data) if data else None
def append_to_log(self, agent_name: str, entry: str):
"""Append to shared task log."""
self.redis.rpush(
f"{self.prefix}:log",
json.dumps({
"agent": agent_name,
"timestamp": datetime.utcnow().isoformat(),
"entry": entry,
})
)
def get_log(self) -> list:
"""Get complete task execution log."""
entries = self.redis.lrange(f"{self.prefix}:log", 0, -1)
return [json.loads(e) for e in entries]
Task Decomposition Patterns
DECOMPOSER_PROMPT = """Break this complex task into 3-7 independent subtasks.
Each subtask should:
- Be executable by a single specialized agent
- Be as independent as possible from other subtasks
- Have a clear input and output
- Be assigned to the most appropriate agent type
Available agent types: researcher, analyst, coder, writer, reviewer, qa_tester
Task: {task}
Return as JSON array:
[
{{
"id": "subtask_1",
"title": "Brief title",
"description": "What needs to be done",
"agent_type": "researcher",
"depends_on": [], // IDs of subtasks that must complete first
"estimated_complexity": "low|medium|high"
}}
]"""
async def decompose_task(task: str) -> list[dict]:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": DECOMPOSER_PROMPT.format(task=task)}],
response_format={"type": "json_object"},
temperature=0,
)
return json.loads(response.choices[0].message.content)
async def execute_decomposed(task: str) -> dict:
"""Execute decomposed task with dependency resolution."""
subtasks = await decompose_task(task)
# Build dependency graph
completed = {}
pending = {t["id"]: t for t in subtasks}
while pending:
# Find subtasks with satisfied dependencies
ready = [
t for t in pending.values()
if all(dep in completed for dep in t["depends_on"])
]
if not ready:
raise RuntimeError("Circular dependency detected")
# Execute ready subtasks in parallel
results = await asyncio.gather(*[
execute_subtask(t, {dep: completed[dep] for dep in t["depends_on"]})
for t in ready
])
for subtask, result in zip(ready, results):
completed[subtask["id"]] = result
del pending[subtask["id"]]
return completed
Error Handling and Resilience
import asyncio
from functools import wraps
def retry_agent(max_attempts: int = 3, delay: float = 1.0):
"""Decorator for automatic agent retry with exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
wait_time = delay * (2 ** attempt)
print(f"Agent {func.__name__} failed (attempt {attempt+1}): {e}")
if attempt < max_attempts - 1:
await asyncio.sleep(wait_time)
raise RuntimeError(f"Agent failed after {max_attempts} attempts: {last_error}")
return wrapper
return decorator
class AgentOrchestrator:
"""Fault-tolerant orchestrator with fallbacks."""
def __init__(self, agents: dict):
self.agents = agents
self.fallback_model = "gpt-4o-mini" # Cheaper fallback
async def run_with_fallback(
self,
primary_agent: str,
fallback_agent: str,
task: str,
context: dict = None
) -> str:
try:
return await self.agents[primary_agent](task, context)
except Exception as e:
print(f"Primary agent {primary_agent} failed: {e}. Using fallback.")
return await self.agents[fallback_agent](task, context)
async def run_with_validation(
self,
agent: str,
task: str,
validator: callable,
max_retries: int = 3
) -> str:
"""Run agent and validate output, retry if invalid."""
for i in range(max_retries):
result = await self.agents[agent](task)
if validator(result):
return result
print(f"Validation failed (attempt {i+1}), retrying with feedback...")
task = f"{task}\n\nPrevious attempt failed validation: {result}\n\nPlease fix and try again."
raise ValueError(f"Agent failed validation after {max_retries} attempts")
A2A Protocol (Agent-to-Agent)
# Standard A2A message format (emerging industry standard)
# Compatible with Google's A2A spec
class A2ARequest:
"""Standard agent-to-agent request."""
@staticmethod
def create(
task_id: str,
from_agent: str,
to_agent: str,
task_type: str,
inputs: dict,
context: dict = None
) -> dict:
return {
"jsonrpc": "2.0",
"method": "agent.execute",
"id": str(uuid.uuid4()),
"params": {
"task": {
"id": task_id,
"type": task_type,
"inputs": inputs,
"context": context or {},
},
"routing": {
"from": from_agent,
"to": to_agent,
"timestamp": datetime.utcnow().isoformat(),
}
}
}
# MoltbotDen A2A via MCP
# Agents on MoltbotDen can send tasks to each other via the messaging system
# POST /conversations/{id}/messages with structured task payload