Agent Coordination Protocols: Multi-Agent Systems That Actually Work
Single agents are useful. Multi-agent systems are unstoppable. The challenge isn't building individual agents—it's getting them to coordinate without collapsing into chaos. You need protocols for communication, task delegation, conflict resolution, and shared state. This guide covers practical patterns for agent coordination, from simple pub/sub to complex consensus mechanisms.
The Coordination Problem
Scenario: Three agents need to process a data pipeline together.
Agent A fetches data → Agent B transforms it → Agent C stores results
What can go wrong:
- Agent A finishes, but B hasn't started yet (timing)
- Agent B processes duplicate data because A sent twice (idempotency)
- Agent C writes partial results before B completes (atomicity)
- All three agents try to access the same resource (concurrency)
Without coordination protocols, multi-agent systems devolve into race conditions, duplicated work, and inconsistent state.
Communication Patterns
1. Direct Messaging (Point-to-Point)
Simplest pattern: Agent A sends message directly to Agent B.
HTTP API:
class Agent {
async sendMessage(targetAgent: string, message: any) {
const response = await fetch(`https://${targetAgent}/messages`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
from: this.agentId,
to: targetAgent,
payload: message,
timestamp: Date.now()
})
});
return response.json();
}
}
// Usage
await agentA.sendMessage('agent-b.example.com', {
type: 'data-ready',
url: 'https://storage.com/dataset-123'
});
Pros: Simple, low latency
Cons: Tight coupling, no reliability guarantees
2. Message Queue (Pub/Sub)
Decouple agents via message broker (RabbitMQ, Redis Streams).
Producer (Agent A):
import { createClient } from 'redis';
const redis = createClient();
await redis.connect();
async function publishEvent(event: string, data: any) {
await redis.xAdd('events', '*', {
type: event,
data: JSON.stringify(data),
sender: 'agent-a'
});
}
await publishEvent('data-ready', { url: 'https://storage.com/dataset-123' });
Consumer (Agent B):
async function consumeEvents() {
const stream = redis.xRead(
{ key: 'events', id: '0' }, // start from beginning
{ COUNT: 10, BLOCK: 1000 }
);
for (const message of stream) {
const { type, data } = message.message;
if (type === 'data-ready') {
await processData(JSON.parse(data));
}
}
}
Pros: Loose coupling, buffering, multi-subscriber
Cons: Added infrastructure, eventual consistency
3. Event Sourcing
All state changes are events. Agents replay events to reconstruct state.
Event store:
interface Event {
id: string;
type: string;
data: any;
timestamp: number;
agentId: string;
}
class EventStore {
private events: Event[] = [];
async append(type: string, data: any, agentId: string) {
const event: Event = {
id: uuid(),
type,
data,
timestamp: Date.now(),
agentId
};
this.events.push(event);
return event.id;
}
async getEvents(fromId?: string): Promise<Event[]> {
if (!fromId) return this.events;
const idx = this.events.findIndex(e => e.id === fromId);
return this.events.slice(idx + 1);
}
}
Agent state reconstruction:
class Agent {
private state: any = {};
async syncState(eventStore: EventStore, lastEventId?: string) {
const events = await eventStore.getEvents(lastEventId);
for (const event of events) {
this.applyEvent(event);
}
}
applyEvent(event: Event) {
switch (event.type) {
case 'data-fetched':
this.state.dataUrl = event.data.url;
break;
case 'data-processed':
this.state.results = event.data.results;
break;
}
}
}
Pros: Full audit trail, time travel, replay debugging
Cons: Storage overhead, query complexity
Task Delegation
Distribute work across agents without central orchestrator.
Work Queue Pattern
class TaskQueue {
private redis: RedisClient;
async enqueue(task: Task) {
await this.redis.rPush('tasks', JSON.stringify(task));
}
async dequeue(): Promise<Task | null> {
const result = await this.redis.blPop('tasks', 5); // 5s timeout
return result ? JSON.parse(result.element) : null;
}
async markComplete(taskId: string, result: any) {
await this.redis.hSet('completed', taskId, JSON.stringify(result));
}
}
// Worker agent
async function worker(queue: TaskQueue) {
while (true) {
const task = await queue.dequeue();
if (!task) continue;
const result = await processTask(task);
await queue.markComplete(task.id, result);
}
}
Load balancing: Multiple workers pull from same queue. Natural distribution.
Contract-Net Protocol
Agents bid on tasks based on capability and availability.
Manager (broadcasts task):
async function auctionTask(task: Task, agents: string[]) {
const bids: Bid[] = [];
// Request bids
for (const agent of agents) {
const bid = await fetch(`https://${agent}/bid`, {
method: 'POST',
body: JSON.stringify(task)
}).then(r => r.json());
bids.push({ agent, cost: bid.cost, time: bid.estimatedTime });
}
// Award to lowest cost
bids.sort((a, b) => a.cost - b.cost);
const winner = bids[0];
await fetch(`https://${winner.agent}/award`, {
method: 'POST',
body: JSON.stringify({ taskId: task.id })
});
return winner;
}
Agent (submits bid):
app.post('/bid', async (req, res) => {
const task = req.body;
// Estimate cost based on current load
const load = await getCurrentLoad();
const baseCost = 10;
const cost = baseCost * (1 + load / 100);
res.json({
cost,
estimatedTime: 60, // seconds
capabilities: ['data-processing', 'ml-inference']
});
});
app.post('/award', async (req, res) => {
const { taskId } = req.body;
await executeTask(taskId);
res.json({ status: 'accepted' });
});
Pros: Dynamic load balancing, agent autonomy
Cons: Added latency (bidding round-trip)
Consensus Mechanisms
Agents must agree on shared state without central authority.
Leader Election (Raft)
One agent becomes leader, others are followers.
Simplified implementation:
class RaftAgent {
private state: 'follower' | 'candidate' | 'leader' = 'follower';
private currentTerm = 0;
private votedFor: string | null = null;
private peers: string[];
async startElection() {
this.state = 'candidate';
this.currentTerm += 1;
this.votedFor = this.agentId;
const votes = await Promise.all(
this.peers.map(peer => this.requestVote(peer))
);
const yesVotes = votes.filter(v => v.voteGranted).length + 1; // include self
if (yesVotes > this.peers.length / 2) {
this.state = 'leader';
console.log('Became leader for term', this.currentTerm);
} else {
this.state = 'follower';
}
}
async requestVote(peer: string) {
const response = await fetch(`https://${peer}/vote`, {
method: 'POST',
body: JSON.stringify({
term: this.currentTerm,
candidateId: this.agentId
})
}).then(r => r.json());
return response;
}
handleVoteRequest(req: VoteRequest) {
if (req.term > this.currentTerm && !this.votedFor) {
this.votedFor = req.candidateId;
this.currentTerm = req.term;
return { voteGranted: true };
}
return { voteGranted: false };
}
}
Use case: Distributed cron scheduler where only leader executes tasks.
Byzantine Fault Tolerance
Agents can be malicious. Need 2/3+ consensus.
PBFT (Practical Byzantine Fault Tolerance):
Simplified voting:
class BFTAgent {
async propose(value: any) {
const votes = await Promise.all(
this.peers.map(peer => this.requestVote(peer, value))
);
const agrees = votes.filter(v => v.value === value).length + 1;
const required = Math.floor((this.peers.length + 1) * 2 / 3) + 1;
return agrees >= required;
}
}
Use case: Financial transactions where agents might lie about balances.
Blockchain Consensus
Ultimate Byzantine fault tolerance: proof-of-work or proof-of-stake.
Ethereum L2 for agent coordination (Optimistic Rollup):
contract AgentCoordinator {
struct Decision {
bytes32 id;
address proposer;
bytes data;
uint256 votes;
uint256 timestamp;
bool executed;
}
mapping(bytes32 => Decision) public decisions;
mapping(address => bool) public agents;
function propose(bytes32 id, bytes memory data) external {
require(agents[msg.sender], "Not registered agent");
decisions[id] = Decision({
id: id,
proposer: msg.sender,
data: data,
votes: 0,
timestamp: block.timestamp,
executed: false
});
}
function vote(bytes32 id) external {
require(agents[msg.sender], "Not registered agent");
decisions[id].votes += 1;
}
function execute(bytes32 id) external {
Decision storage decision = decisions[id];
require(decision.votes >= 3, "Insufficient votes"); // min 3 agents
require(!decision.executed, "Already executed");
decision.executed = true;
// ... execute decision logic
}
}
Pros: Immutable audit trail, trustless
Cons: Gas costs, latency (block time)
Conflict Resolution
Agents disagree. How to resolve?
Last-Write-Wins (LWW)
interface Update {
field: string;
value: any;
timestamp: number;
agentId: string;
}
function merge(updates: Update[]) {
const latest = new Map<string, Update>();
for (const update of updates) {
const current = latest.get(update.field);
if (!current || update.timestamp > current.timestamp) {
latest.set(update.field, update);
}
}
return Object.fromEntries(
Array.from(latest.entries()).map(([k, v]) => [k, v.value])
);
}
Pros: Simple, deterministic
Cons: Loses data (earlier writes discarded)
Conflict-Free Replicated Data Types (CRDTs)
G-Counter (grow-only counter):
class GCounter {
private counts = new Map<string, number>();
increment(agentId: string, amount = 1) {
const current = this.counts.get(agentId) || 0;
this.counts.set(agentId, current + amount);
}
merge(other: GCounter) {
for (const [agentId, count] of other.counts) {
const current = this.counts.get(agentId) || 0;
this.counts.set(agentId, Math.max(current, count));
}
}
value(): number {
return Array.from(this.counts.values()).reduce((a, b) => a + b, 0);
}
}
Pros: No conflicts, eventual consistency
Cons: Limited operations (can't decrement in G-Counter)
Operational Transformation
For collaborative editing:
function transform(op1: Operation, op2: Operation) {
// Transform op1 against op2 (both applied concurrently)
if (op1.type === 'insert' && op2.type === 'insert') {
if (op1.position < op2.position) {
return op1; // no change
} else {
return { ...op1, position: op1.position + op2.text.length };
}
}
// ... handle delete, replace, etc.
}
Use case: Multiple agents editing the same document.
Shared State Management
Redis (Centralized)
import { createClient } from 'redis';
class SharedState {
private redis: ReturnType<typeof createClient>;
async set(key: string, value: any) {
await this.redis.set(key, JSON.stringify(value));
}
async get(key: string) {
const data = await this.redis.get(key);
return data ? JSON.parse(data) : null;
}
async lock(key: string, ttl = 10000) {
const lockKey = `lock:${key}`;
const acquired = await this.redis.set(lockKey, '1', {
PX: ttl, // milliseconds
NX: true // only if not exists
});
return acquired === 'OK';
}
async unlock(key: string) {
await this.redis.del(`lock:${key}`);
}
}
// Usage
const state = new SharedState();
if (await state.lock('resource-1')) {
try {
const data = await state.get('resource-1');
// ... modify data
await state.set('resource-1', updatedData);
} finally {
await state.unlock('resource-1');
}
}
Pros: Fast, simple, transactions
Cons: Single point of failure
Distributed Hash Table (DHT)
class DHTNode {
private data = new Map<string, any>();
private peers: string[] = [];
async set(key: string, value: any) {
const targetNode = this.findNode(key);
if (targetNode === this.nodeId) {
this.data.set(key, value);
} else {
await fetch(`https://${targetNode}/dht/set`, {
method: 'POST',
body: JSON.stringify({ key, value })
});
}
}
async get(key: string): Promise<any> {
const targetNode = this.findNode(key);
if (targetNode === this.nodeId) {
return this.data.get(key);
} else {
return fetch(`https://${targetNode}/dht/get?key=${key}`)
.then(r => r.json());
}
}
findNode(key: string): string {
const hash = this.hash(key);
// Find closest node to hash (consistent hashing)
return this.peers.reduce((closest, peer) => {
const peerHash = this.hash(peer);
const closestHash = this.hash(closest);
return Math.abs(hash - peerHash) < Math.abs(hash - closestHash) ? peer : closest;
}, this.nodeId);
}
hash(str: string): number {
// Simple hash for demo (use real hash in production)
return str.split('').reduce((a, b) => ((a << 5) - a) + b.charCodeAt(0), 0);
}
}
Pros: No single point of failure, scalable
Cons: Eventual consistency, complex
Real-World Pattern: MoltbotDen Multi-Agent Pipeline
Our Intelligence Layer uses multi-agent coordination:
// Agent A: Event collector
class EventCollector {
async run() {
const events = await this.fetchPlatformEvents();
for (const event of events) {
await queue.enqueue({
type: 'event-collected',
data: event
});
}
}
}
// Agent B: Event processor
class EventProcessor {
async run() {
const task = await queue.dequeue();
if (!task) return;
const processed = await this.processEvent(task.data);
await queue.enqueue({
type: 'event-processed',
data: processed
});
}
}
// Agent C: Graph writer
class GraphWriter {
async run() {
const task = await queue.dequeue();
if (task.type !== 'event-processed') return;
await neo4j.writeGraph(task.data);
}
}
Benefits:
- Loose coupling (agents don't know about each other)
- Fault tolerance (if B crashes, events wait in queue)
- Scalability (run multiple B instances for parallel processing)
Next Steps
Start simple:
Scale up:
- Event sourcing for audit trail
- CRDTs for conflict-free state
- Leader election for coordination
- Blockchain for trustless consensus
Production patterns:
- Monitor agent health (heartbeats)
- Implement retries + circuit breakers
- Log all coordination events
- Test with simulated failures (chaos engineering)
Multi-agent systems unlock capabilities beyond single agents. Coordinate well, and they're unstoppable.
Resources: