Intermediate

Inter-Agent Communication

How agents share information determines whether a multi-agent system works or descends into chaos. This lesson covers the three communication patterns (direct messaging, shared memory, event-driven), how to define contracts between agents, and how to resolve conflicts when agents disagree.

Communication Pattern 1: Direct Message Passing

The simplest pattern — agents send messages directly to each other. The orchestrator passes one agent's output as another agent's input. This is what happens in sequential pipelines.

from dataclasses import dataclass, field
from typing import Any
from datetime import datetime
from enum import Enum
import uuid

class MessageType(Enum):
    TASK = "task"                # "Do this work"
    RESULT = "result"           # "Here is my output"
    FEEDBACK = "feedback"       # "Your output needs changes"
    QUESTION = "question"       # "I need clarification"
    STATUS = "status"           # "I am 50% done"

@dataclass
class AgentMessage:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    sender: str = ""
    recipient: str = ""
    msg_type: MessageType = MessageType.TASK
    content: str = ""
    metadata: dict = field(default_factory=dict)
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    reply_to: str = None        # ID of message this replies to

class MessageBus:
    """Central message bus for agent-to-agent communication."""

    def __init__(self):
        self.messages: list[AgentMessage] = []
        self.subscribers: dict[str, list] = {}  # agent_name -> callback list

    def send(self, message: AgentMessage):
        """Send a message and notify the recipient."""
        self.messages.append(message)
        if message.recipient in self.subscribers:
            for callback in self.subscribers[message.recipient]:
                callback(message)

    def subscribe(self, agent_name: str, callback):
        """Register to receive messages."""
        self.subscribers.setdefault(agent_name, []).append(callback)

    def get_conversation(self, agent_a: str, agent_b: str) -> list[AgentMessage]:
        """Get all messages between two agents, in order."""
        return [
            m for m in self.messages
            if (m.sender in (agent_a, agent_b) and m.recipient in (agent_a, agent_b))
        ]

    def get_inbox(self, agent_name: str, msg_type: MessageType = None) -> list[AgentMessage]:
        """Get unprocessed messages for an agent."""
        msgs = [m for m in self.messages if m.recipient == agent_name]
        if msg_type:
            msgs = [m for m in msgs if m.msg_type == msg_type]
        return msgs
💡
Apply at work: Use typed messages (MessageType enum) to make agent communication predictable. When every message has a clear type, agents can filter for what they need. A reviewer agent processes only RESULT messages. A supervisor processes only STATUS and QUESTION messages. This prevents agents from being overwhelmed by irrelevant communication.

Communication Pattern 2: Shared Memory (Blackboard)

In the blackboard pattern, all agents read from and write to a shared workspace. This is ideal when agents need to build on each other's work or when the order of operations is flexible.

import threading
from typing import Any, Optional
import json
import time

class Blackboard:
    """Shared workspace that all agents can read from and write to.

    Think of it as a shared whiteboard in a meeting room where any team member
    can write findings, and others can read and build on them.
    """

    def __init__(self):
        self._store: dict[str, Any] = {}
        self._history: list[dict] = []
        self._lock = threading.RLock()  # Thread-safe access
        self._watchers: dict[str, list] = {}  # key_pattern -> callbacks

    def write(self, key: str, value: Any, author: str):
        """Write a value to the blackboard."""
        with self._lock:
            self._store[key] = value
            self._history.append({
                "action": "write",
                "key": key,
                "author": author,
                "timestamp": time.time(),
                "value_preview": str(value)[:200],
            })
            # Notify watchers
            for pattern, callbacks in self._watchers.items():
                if pattern in key or pattern == "*":
                    for cb in callbacks:
                        cb(key, value, author)

    def read(self, key: str, default: Any = None) -> Any:
        """Read a value from the blackboard."""
        with self._lock:
            return self._store.get(key, default)

    def read_all(self, prefix: str = "") -> dict:
        """Read all values matching a prefix."""
        with self._lock:
            if not prefix:
                return dict(self._store)
            return {k: v for k, v in self._store.items() if k.startswith(prefix)}

    def watch(self, key_pattern: str, callback):
        """Watch for changes to keys matching a pattern."""
        self._watchers.setdefault(key_pattern, []).append(callback)

    def get_audit_log(self, author: str = None) -> list[dict]:
        """Get history of all writes, optionally filtered by author."""
        if author:
            return [h for h in self._history if h["author"] == author]
        return list(self._history)

# Usage: Research workflow with shared blackboard
blackboard = Blackboard()

# Research agent writes findings
blackboard.write("research/source_1", {
    "title": "Agent architectures survey",
    "key_findings": ["ReAct is dominant", "Tool use improves by 40%"],
    "confidence": 0.85,
}, author="researcher_1")

# Another research agent adds more data
blackboard.write("research/source_2", {
    "title": "Production multi-agent deployments",
    "key_findings": ["Supervisor pattern used by 70% of teams"],
    "confidence": 0.90,
}, author="researcher_2")

# Synthesizer reads all research and creates summary
all_research = blackboard.read_all(prefix="research/")
blackboard.write("synthesis/final", {
    "summary": "Combined findings from all sources...",
    "sources": list(all_research.keys()),
}, author="synthesizer")

Communication Pattern 3: Event-Driven

In event-driven communication, agents emit events when they complete work, and other agents react to those events. This decouples agents from each other — they do not need to know who their consumers are.

import asyncio
from dataclasses import dataclass
from typing import Callable, Awaitable

@dataclass
class AgentEvent:
    event_type: str          # e.g., "code.written", "test.passed", "review.failed"
    source_agent: str
    payload: dict
    timestamp: float = 0.0

class EventBroker:
    """Pub/sub event broker for decoupled agent communication."""

    def __init__(self):
        self._handlers: dict[str, list[Callable]] = {}
        self._event_log: list[AgentEvent] = []

    def on(self, event_type: str, handler: Callable[[AgentEvent], Awaitable]):
        """Subscribe to an event type."""
        self._handlers.setdefault(event_type, []).append(handler)

    async def emit(self, event: AgentEvent):
        """Emit an event and trigger all subscribed handlers."""
        import time
        event.timestamp = time.time()
        self._event_log.append(event)

        handlers = self._handlers.get(event.event_type, [])
        # Also trigger wildcard handlers
        handlers += self._handlers.get("*", [])

        if handlers:
            await asyncio.gather(*[h(event) for h in handlers])

# Usage: Event-driven code workflow
broker = EventBroker()

# When code is written, automatically trigger testing and review
async def on_code_written(event: AgentEvent):
    code = event.payload["code"]
    file_path = event.payload["file_path"]
    # Tester agent runs automatically
    test_result = await tester_agent.run(f"Write and run tests for {file_path}")
    await broker.emit(AgentEvent(
        event_type="test.completed",
        source_agent="tester",
        payload={"passed": test_result.status == "completed", "file": file_path}
    ))

async def on_code_written_review(event: AgentEvent):
    # Reviewer agent also runs automatically (in parallel with tester)
    review = await reviewer_agent.run(f"Review code in {event.payload['file_path']}")
    await broker.emit(AgentEvent(
        event_type="review.completed",
        source_agent="reviewer",
        payload={"approved": "LGTM" in str(review.messages[-1]), "file": event.payload["file_path"]}
    ))

broker.on("code.written", on_code_written)
broker.on("code.written", on_code_written_review)

Choosing the Right Communication Pattern

PatternCouplingBest ForComplexity
Direct messaging Tight Sequential pipelines, request-reply between two agents Low
Shared memory Medium Collaborative tasks, research, building shared artifacts Medium
Event-driven Loose Reactive workflows, CI/CD-style pipelines, extensible systems High

Structured Output Contracts Between Agents

The most common failure in multi-agent systems is agents producing output that downstream agents cannot parse. Fix this with explicit contracts.

from pydantic import BaseModel, Field
from typing import Literal

# Contract: What the planner agent MUST output
class PlanOutput(BaseModel):
    steps: list["PlanStep"]
    estimated_time_minutes: int
    risk_level: Literal["low", "medium", "high"]

class PlanStep(BaseModel):
    step_number: int
    description: str
    assigned_agent: Literal["coder", "tester", "reviewer", "researcher"]
    depends_on: list[int] = Field(default_factory=list,
        description="Step numbers that must complete before this step")
    estimated_minutes: int

# Contract: What the coder agent MUST output
class CodeOutput(BaseModel):
    files_modified: list["FileChange"]
    explanation: str
    tests_needed: bool

class FileChange(BaseModel):
    path: str
    action: Literal["create", "modify", "delete"]
    content: str
    language: str

# Contract: What the reviewer agent MUST output
class ReviewOutput(BaseModel):
    approved: bool
    issues: list["ReviewIssue"]
    overall_quality: Literal["excellent", "good", "needs_work", "reject"]

class ReviewIssue(BaseModel):
    severity: Literal["critical", "major", "minor", "suggestion"]
    file: str
    line: int = None
    description: str
    suggested_fix: str = None

# Enforce contracts using structured output
def validate_agent_output(raw_output: str, contract: type[BaseModel]) -> BaseModel:
    """Parse agent output against its contract. Raises ValidationError if invalid."""
    import json
    data = json.loads(raw_output)
    return contract.model_validate(data)
📝
Production reality: Use Pydantic models (or JSON Schema) as contracts between every pair of agents that communicate. When you add a new agent, define its input and output contracts first. This prevents the most common multi-agent failure: Agent B crashes because Agent A changed its output format without telling anyone.

Conflict Resolution

When multiple agents produce conflicting results, you need a resolution strategy. Here are the three approaches used in production:

class ConflictResolver:
    """Resolve disagreements between agents."""

    async def resolve_by_voting(self, positions: list[dict]) -> dict:
        """Majority vote - best for factual questions."""
        from collections import Counter
        answers = [p["answer"] for p in positions]
        winner = Counter(answers).most_common(1)[0][0]
        agreement_ratio = Counter(answers)[winner] / len(answers)
        return {
            "resolved_answer": winner,
            "agreement": agreement_ratio,
            "method": "majority_vote",
        }

    async def resolve_by_confidence(self, positions: list[dict]) -> dict:
        """Weighted by each agent's confidence score."""
        best = max(positions, key=lambda p: p.get("confidence", 0))
        return {
            "resolved_answer": best["answer"],
            "confidence": best["confidence"],
            "agent": best["agent_name"],
            "method": "highest_confidence",
        }

    async def resolve_by_judge(self, positions: list[dict],
                                 judge_agent: "ReActAgent") -> dict:
        """An independent judge agent evaluates all positions."""
        judge_prompt = "Multiple agents produced different answers. Evaluate each and pick the best.\n\n"
        for i, pos in enumerate(positions):
            judge_prompt += f"Agent {pos['agent_name']}:\n"
            judge_prompt += f"Answer: {pos['answer']}\n"
            judge_prompt += f"Reasoning: {pos.get('reasoning', 'Not provided')}\n\n"
        judge_prompt += "Which answer is correct and why? Respond with the best answer and your reasoning."

        result = await judge_agent.run(judge_prompt)
        return {
            "resolved_answer": result.messages[-1]["content"],
            "method": "judge",
            "judge_reasoning": result.messages[-1]["content"],
        }

    async def resolve(self, positions: list[dict],
                       strategy: str = "judge", judge_agent=None) -> dict:
        """Resolve conflict using the specified strategy."""
        if strategy == "vote":
            return await self.resolve_by_voting(positions)
        elif strategy == "confidence":
            return await self.resolve_by_confidence(positions)
        elif strategy == "judge":
            return await self.resolve_by_judge(positions, judge_agent)
        raise ValueError(f"Unknown strategy: {strategy}")

Key Takeaways

  • Three communication patterns: direct messaging (simple, tight coupling), shared memory/blackboard (collaborative), and event-driven (decoupled, reactive).
  • Use typed messages (task, result, feedback, question, status) so agents can filter relevant communication.
  • Define Pydantic contracts between every pair of agents. This prevents the number one failure mode: output format mismatches.
  • Conflict resolution strategies: majority vote for factual questions, confidence-weighted for graded answers, judge agent for complex disagreements.
  • Start with direct messaging. Move to shared memory when agents build on each other's work. Use event-driven when you need extensibility.

What Is Next

Agents need tools to interact with the outside world. The next lesson covers tool and action infrastructure — building tool registries, sandboxed execution environments, permission models, rate limiting, and audit logging for production agent systems.