Inter-Agent Communication
How agents share information determines whether a multi-agent system works or descends into chaos. This lesson covers the three communication patterns (direct messaging, shared memory, event-driven), how to define contracts between agents, and how to resolve conflicts when agents disagree.
Communication Pattern 1: Direct Message Passing
The simplest pattern — agents send messages directly to each other. The orchestrator passes one agent's output as another agent's input. This is what happens in sequential pipelines.
from dataclasses import dataclass, field
from typing import Any
from datetime import datetime
from enum import Enum
import uuid
class MessageType(Enum):
TASK = "task" # "Do this work"
RESULT = "result" # "Here is my output"
FEEDBACK = "feedback" # "Your output needs changes"
QUESTION = "question" # "I need clarification"
STATUS = "status" # "I am 50% done"
@dataclass
class AgentMessage:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
sender: str = ""
recipient: str = ""
msg_type: MessageType = MessageType.TASK
content: str = ""
metadata: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
reply_to: str = None # ID of message this replies to
class MessageBus:
"""Central message bus for agent-to-agent communication."""
def __init__(self):
self.messages: list[AgentMessage] = []
self.subscribers: dict[str, list] = {} # agent_name -> callback list
def send(self, message: AgentMessage):
"""Send a message and notify the recipient."""
self.messages.append(message)
if message.recipient in self.subscribers:
for callback in self.subscribers[message.recipient]:
callback(message)
def subscribe(self, agent_name: str, callback):
"""Register to receive messages."""
self.subscribers.setdefault(agent_name, []).append(callback)
def get_conversation(self, agent_a: str, agent_b: str) -> list[AgentMessage]:
"""Get all messages between two agents, in order."""
return [
m for m in self.messages
if (m.sender in (agent_a, agent_b) and m.recipient in (agent_a, agent_b))
]
def get_inbox(self, agent_name: str, msg_type: MessageType = None) -> list[AgentMessage]:
"""Get unprocessed messages for an agent."""
msgs = [m for m in self.messages if m.recipient == agent_name]
if msg_type:
msgs = [m for m in msgs if m.msg_type == msg_type]
return msgs
Communication Pattern 2: Shared Memory (Blackboard)
In the blackboard pattern, all agents read from and write to a shared workspace. This is ideal when agents need to build on each other's work or when the order of operations is flexible.
import threading
from typing import Any, Optional
import json
import time
class Blackboard:
"""Shared workspace that all agents can read from and write to.
Think of it as a shared whiteboard in a meeting room where any team member
can write findings, and others can read and build on them.
"""
def __init__(self):
self._store: dict[str, Any] = {}
self._history: list[dict] = []
self._lock = threading.RLock() # Thread-safe access
self._watchers: dict[str, list] = {} # key_pattern -> callbacks
def write(self, key: str, value: Any, author: str):
"""Write a value to the blackboard."""
with self._lock:
self._store[key] = value
self._history.append({
"action": "write",
"key": key,
"author": author,
"timestamp": time.time(),
"value_preview": str(value)[:200],
})
# Notify watchers
for pattern, callbacks in self._watchers.items():
if pattern in key or pattern == "*":
for cb in callbacks:
cb(key, value, author)
def read(self, key: str, default: Any = None) -> Any:
"""Read a value from the blackboard."""
with self._lock:
return self._store.get(key, default)
def read_all(self, prefix: str = "") -> dict:
"""Read all values matching a prefix."""
with self._lock:
if not prefix:
return dict(self._store)
return {k: v for k, v in self._store.items() if k.startswith(prefix)}
def watch(self, key_pattern: str, callback):
"""Watch for changes to keys matching a pattern."""
self._watchers.setdefault(key_pattern, []).append(callback)
def get_audit_log(self, author: str = None) -> list[dict]:
"""Get history of all writes, optionally filtered by author."""
if author:
return [h for h in self._history if h["author"] == author]
return list(self._history)
# Usage: Research workflow with shared blackboard
blackboard = Blackboard()
# Research agent writes findings
blackboard.write("research/source_1", {
"title": "Agent architectures survey",
"key_findings": ["ReAct is dominant", "Tool use improves by 40%"],
"confidence": 0.85,
}, author="researcher_1")
# Another research agent adds more data
blackboard.write("research/source_2", {
"title": "Production multi-agent deployments",
"key_findings": ["Supervisor pattern used by 70% of teams"],
"confidence": 0.90,
}, author="researcher_2")
# Synthesizer reads all research and creates summary
all_research = blackboard.read_all(prefix="research/")
blackboard.write("synthesis/final", {
"summary": "Combined findings from all sources...",
"sources": list(all_research.keys()),
}, author="synthesizer")
Communication Pattern 3: Event-Driven
In event-driven communication, agents emit events when they complete work, and other agents react to those events. This decouples agents from each other — they do not need to know who their consumers are.
import asyncio
from dataclasses import dataclass
from typing import Callable, Awaitable
@dataclass
class AgentEvent:
event_type: str # e.g., "code.written", "test.passed", "review.failed"
source_agent: str
payload: dict
timestamp: float = 0.0
class EventBroker:
"""Pub/sub event broker for decoupled agent communication."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = {}
self._event_log: list[AgentEvent] = []
def on(self, event_type: str, handler: Callable[[AgentEvent], Awaitable]):
"""Subscribe to an event type."""
self._handlers.setdefault(event_type, []).append(handler)
async def emit(self, event: AgentEvent):
"""Emit an event and trigger all subscribed handlers."""
import time
event.timestamp = time.time()
self._event_log.append(event)
handlers = self._handlers.get(event.event_type, [])
# Also trigger wildcard handlers
handlers += self._handlers.get("*", [])
if handlers:
await asyncio.gather(*[h(event) for h in handlers])
# Usage: Event-driven code workflow
broker = EventBroker()
# When code is written, automatically trigger testing and review
async def on_code_written(event: AgentEvent):
code = event.payload["code"]
file_path = event.payload["file_path"]
# Tester agent runs automatically
test_result = await tester_agent.run(f"Write and run tests for {file_path}")
await broker.emit(AgentEvent(
event_type="test.completed",
source_agent="tester",
payload={"passed": test_result.status == "completed", "file": file_path}
))
async def on_code_written_review(event: AgentEvent):
# Reviewer agent also runs automatically (in parallel with tester)
review = await reviewer_agent.run(f"Review code in {event.payload['file_path']}")
await broker.emit(AgentEvent(
event_type="review.completed",
source_agent="reviewer",
payload={"approved": "LGTM" in str(review.messages[-1]), "file": event.payload["file_path"]}
))
broker.on("code.written", on_code_written)
broker.on("code.written", on_code_written_review)
Choosing the Right Communication Pattern
| Pattern | Coupling | Best For | Complexity |
|---|---|---|---|
| Direct messaging | Tight | Sequential pipelines, request-reply between two agents | Low |
| Shared memory | Medium | Collaborative tasks, research, building shared artifacts | Medium |
| Event-driven | Loose | Reactive workflows, CI/CD-style pipelines, extensible systems | High |
Structured Output Contracts Between Agents
The most common failure in multi-agent systems is agents producing output that downstream agents cannot parse. Fix this with explicit contracts.
from pydantic import BaseModel, Field
from typing import Literal
# Contract: What the planner agent MUST output
class PlanOutput(BaseModel):
steps: list["PlanStep"]
estimated_time_minutes: int
risk_level: Literal["low", "medium", "high"]
class PlanStep(BaseModel):
step_number: int
description: str
assigned_agent: Literal["coder", "tester", "reviewer", "researcher"]
depends_on: list[int] = Field(default_factory=list,
description="Step numbers that must complete before this step")
estimated_minutes: int
# Contract: What the coder agent MUST output
class CodeOutput(BaseModel):
files_modified: list["FileChange"]
explanation: str
tests_needed: bool
class FileChange(BaseModel):
path: str
action: Literal["create", "modify", "delete"]
content: str
language: str
# Contract: What the reviewer agent MUST output
class ReviewOutput(BaseModel):
approved: bool
issues: list["ReviewIssue"]
overall_quality: Literal["excellent", "good", "needs_work", "reject"]
class ReviewIssue(BaseModel):
severity: Literal["critical", "major", "minor", "suggestion"]
file: str
line: int = None
description: str
suggested_fix: str = None
# Enforce contracts using structured output
def validate_agent_output(raw_output: str, contract: type[BaseModel]) -> BaseModel:
"""Parse agent output against its contract. Raises ValidationError if invalid."""
import json
data = json.loads(raw_output)
return contract.model_validate(data)
Conflict Resolution
When multiple agents produce conflicting results, you need a resolution strategy. Here are the three approaches used in production:
class ConflictResolver:
"""Resolve disagreements between agents."""
async def resolve_by_voting(self, positions: list[dict]) -> dict:
"""Majority vote - best for factual questions."""
from collections import Counter
answers = [p["answer"] for p in positions]
winner = Counter(answers).most_common(1)[0][0]
agreement_ratio = Counter(answers)[winner] / len(answers)
return {
"resolved_answer": winner,
"agreement": agreement_ratio,
"method": "majority_vote",
}
async def resolve_by_confidence(self, positions: list[dict]) -> dict:
"""Weighted by each agent's confidence score."""
best = max(positions, key=lambda p: p.get("confidence", 0))
return {
"resolved_answer": best["answer"],
"confidence": best["confidence"],
"agent": best["agent_name"],
"method": "highest_confidence",
}
async def resolve_by_judge(self, positions: list[dict],
judge_agent: "ReActAgent") -> dict:
"""An independent judge agent evaluates all positions."""
judge_prompt = "Multiple agents produced different answers. Evaluate each and pick the best.\n\n"
for i, pos in enumerate(positions):
judge_prompt += f"Agent {pos['agent_name']}:\n"
judge_prompt += f"Answer: {pos['answer']}\n"
judge_prompt += f"Reasoning: {pos.get('reasoning', 'Not provided')}\n\n"
judge_prompt += "Which answer is correct and why? Respond with the best answer and your reasoning."
result = await judge_agent.run(judge_prompt)
return {
"resolved_answer": result.messages[-1]["content"],
"method": "judge",
"judge_reasoning": result.messages[-1]["content"],
}
async def resolve(self, positions: list[dict],
strategy: str = "judge", judge_agent=None) -> dict:
"""Resolve conflict using the specified strategy."""
if strategy == "vote":
return await self.resolve_by_voting(positions)
elif strategy == "confidence":
return await self.resolve_by_confidence(positions)
elif strategy == "judge":
return await self.resolve_by_judge(positions, judge_agent)
raise ValueError(f"Unknown strategy: {strategy}")
Key Takeaways
- Three communication patterns: direct messaging (simple, tight coupling), shared memory/blackboard (collaborative), and event-driven (decoupled, reactive).
- Use typed messages (task, result, feedback, question, status) so agents can filter relevant communication.
- Define Pydantic contracts between every pair of agents. This prevents the number one failure mode: output format mismatches.
- Conflict resolution strategies: majority vote for factual questions, confidence-weighted for graded answers, judge agent for complex disagreements.
- Start with direct messaging. Move to shared memory when agents build on each other's work. Use event-driven when you need extensibility.
What Is Next
Agents need tools to interact with the outside world. The next lesson covers tool and action infrastructure — building tool registries, sandboxed execution environments, permission models, rate limiting, and audit logging for production agent systems.
Lilly Tech Systems