Reliability & Scaling
Multi-agent systems have more failure modes than single-agent systems. An agent can fail, get stuck in a loop, exceed its budget, or produce results that conflict with other agents. This lesson covers how to handle every failure mode, implement human oversight, and scale multi-agent workflows for production traffic.
Agent Failure Handling
Every agent in a multi-agent system can fail. The question is not whether failures will happen, but how your system responds. Here are the five failure modes and their handling strategies:
| Failure Mode | Symptom | Detection | Recovery |
|---|---|---|---|
| LLM API error | 429/500/timeout from provider | HTTP status code | Exponential backoff with jitter, failover to backup model |
| Infinite loop | Agent repeats same tool call | Loop detection (3+ identical calls) | Inject "you are looping" message, reduce temperature, force different action |
| Budget exceeded | Cost or step count over limit | Cost tracker / step counter | Graceful shutdown with partial result, notify supervisor |
| Tool failure | Tool returns error or times out | Exception handling + timeout | Retry with backoff, try alternative tool, report to agent for replanning |
| Quality failure | Output does not meet quality bar | Validation against output contract | Retry with feedback, escalate to reviewer agent, fall back to human |
import asyncio
import time
import random
class ResilientOrchestrator:
"""Orchestrator with comprehensive failure handling."""
def __init__(self, agents: dict, max_workflow_cost: float = 5.0,
max_workflow_time: int = 300):
self.agents = agents
self.max_cost = max_workflow_cost
self.max_time = max_workflow_time # seconds
self.total_cost = 0.0
self.start_time = None
async def call_agent_with_resilience(self, agent_name: str, task: str,
max_retries: int = 3) -> dict:
"""Call an agent with retry, timeout, and budget enforcement."""
agent = self.agents[agent_name]
for attempt in range(max_retries):
# Check budget
if self.total_cost >= self.max_cost:
return {"status": "budget_exceeded",
"message": f"Workflow budget ${self.max_cost} exceeded"}
# Check time
elapsed = time.time() - self.start_time
if elapsed >= self.max_time:
return {"status": "timeout",
"message": f"Workflow time limit {self.max_time}s exceeded"}
try:
# Run agent with timeout
remaining_time = self.max_time - elapsed
result = await asyncio.wait_for(
agent.run(task),
timeout=min(60, remaining_time) # Per-agent timeout of 60s
)
# Estimate and track cost
tokens_used = sum(len(str(m)) for m in result.messages) / 4
cost = tokens_used * 0.00001 # Rough estimate
self.total_cost += cost
if result.status == "completed":
return {"status": "success", "result": result, "cost": cost}
# Agent failed but did not error - retry with different approach
task = f"Previous attempt failed. Try a different approach.\n\nOriginal task: {task}"
except asyncio.TimeoutError:
if attempt < max_retries - 1:
continue
return {"status": "timeout", "message": f"Agent '{agent_name}' timed out"}
except Exception as e:
if attempt < max_retries - 1:
wait = (2 ** attempt) + random.uniform(0, 1) # Exponential backoff
await asyncio.sleep(wait)
continue
return {"status": "error", "message": str(e)}
return {"status": "max_retries", "message": f"Agent '{agent_name}' failed after {max_retries} attempts"}
async def run_workflow(self, tasks: list[dict]) -> dict:
"""Execute a workflow with full resilience."""
self.start_time = time.time()
self.total_cost = 0.0
results = []
for task_spec in tasks:
result = await self.call_agent_with_resilience(
task_spec["agent"], task_spec["instruction"]
)
results.append({"task": task_spec, "result": result})
# Stop workflow on critical failures
if result["status"] in ("budget_exceeded", "timeout"):
return {
"workflow_status": "aborted",
"reason": result["status"],
"completed_tasks": len([r for r in results if r["result"]["status"] == "success"]),
"total_tasks": len(tasks),
"total_cost": self.total_cost,
"results": results,
}
return {
"workflow_status": "completed",
"total_cost": self.total_cost,
"duration_seconds": round(time.time() - self.start_time, 1),
"results": results,
}
Human-in-the-Loop Checkpoints
For high-stakes operations, agents should pause and request human approval before proceeding. This is especially important for destructive actions (file deletion, deployments, sending emails).
import asyncio
from enum import Enum
from dataclasses import dataclass
class ApprovalStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
MODIFIED = "modified"
@dataclass
class ApprovalRequest:
id: str
workflow_id: str
agent_name: str
action: str # What the agent wants to do
details: str # Context for the human reviewer
risk_level: str # low, medium, high, critical
status: ApprovalStatus = ApprovalStatus.PENDING
reviewer_comment: str = None
modified_action: str = None # If reviewer modified the action
class HumanCheckpoint:
"""Manages human-in-the-loop approval for agent actions."""
def __init__(self, approval_callback=None):
self.pending: dict[str, ApprovalRequest] = {}
self.approval_callback = approval_callback # e.g., send Slack message
async def request_approval(self, request: ApprovalRequest,
timeout_seconds: int = 300) -> ApprovalRequest:
"""Block the workflow until a human approves, rejects, or modifies."""
self.pending[request.id] = request
# Notify human (Slack, email, dashboard, etc.)
if self.approval_callback:
await self.approval_callback(request)
# Wait for approval
start = asyncio.get_event_loop().time()
while request.status == ApprovalStatus.PENDING:
if asyncio.get_event_loop().time() - start > timeout_seconds:
request.status = ApprovalStatus.REJECTED
request.reviewer_comment = "Auto-rejected: approval timeout"
break
await asyncio.sleep(1)
del self.pending[request.id]
return request
def approve(self, request_id: str, comment: str = None):
"""Human approves the action."""
if request_id in self.pending:
self.pending[request_id].status = ApprovalStatus.APPROVED
self.pending[request_id].reviewer_comment = comment
def reject(self, request_id: str, comment: str):
"""Human rejects the action."""
if request_id in self.pending:
self.pending[request_id].status = ApprovalStatus.REJECTED
self.pending[request_id].reviewer_comment = comment
def modify(self, request_id: str, modified_action: str, comment: str = None):
"""Human approves but modifies the action."""
if request_id in self.pending:
self.pending[request_id].status = ApprovalStatus.MODIFIED
self.pending[request_id].modified_action = modified_action
self.pending[request_id].reviewer_comment = comment
Horizontal Scaling
When one orchestrator cannot handle your traffic, you need to scale horizontally. Here are the patterns for scaling multi-agent systems:
| Scaling Pattern | How It Works | When to Use |
|---|---|---|
| Worker pool | Multiple orchestrator instances pull from a shared task queue (Redis, SQS) | High throughput, independent workflows |
| Agent pool | Multiple instances of each agent type, orchestrator routes to least-loaded | Bottleneck on specific agent types (e.g., coder agents) |
| Sharded workflows | Workflows partitioned by customer/tenant, each shard has its own orchestrator | Multi-tenant SaaS, data isolation requirements |
import asyncio
from collections import deque
class AgentWorkerPool:
"""Pool of agent workers that processes tasks from a queue."""
def __init__(self, agent_factory, pool_size: int = 5):
self.agent_factory = agent_factory
self.pool_size = pool_size
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: dict[str, dict] = {}
self.workers: list[asyncio.Task] = []
self.active_count = 0
async def _worker(self, worker_id: int):
"""Worker loop: pull tasks and execute them."""
agent = self.agent_factory()
while True:
task_id, instruction = await self.task_queue.get()
self.active_count += 1
try:
result = await agent.run(instruction)
self.results[task_id] = {
"status": "completed",
"result": result.messages[-1].get("content", ""),
"worker_id": worker_id,
}
except Exception as e:
self.results[task_id] = {
"status": "error",
"error": str(e),
"worker_id": worker_id,
}
finally:
self.active_count -= 1
self.task_queue.task_done()
async def start(self):
"""Start the worker pool."""
for i in range(self.pool_size):
worker = asyncio.create_task(self._worker(i))
self.workers.append(worker)
async def submit(self, task_id: str, instruction: str):
"""Submit a task to the pool."""
await self.task_queue.put((task_id, instruction))
async def wait_for_result(self, task_id: str, timeout: int = 120) -> dict:
"""Wait for a specific task to complete."""
start = asyncio.get_event_loop().time()
while task_id not in self.results:
if asyncio.get_event_loop().time() - start > timeout:
return {"status": "timeout"}
await asyncio.sleep(0.1)
return self.results.pop(task_id)
def get_stats(self) -> dict:
return {
"pool_size": self.pool_size,
"queued_tasks": self.task_queue.qsize(),
"active_workers": self.active_count,
"idle_workers": self.pool_size - self.active_count,
}
Monitoring Agent Workflows
You cannot operate what you cannot observe. Every production multi-agent system needs these metrics:
from dataclasses import dataclass, field
import time
@dataclass
class WorkflowMetrics:
"""Metrics for a single workflow execution."""
workflow_id: str
start_time: float = field(default_factory=time.time)
end_time: float = 0
total_llm_calls: int = 0
total_tool_calls: int = 0
total_tokens: int = 0
total_cost_usd: float = 0
agents_used: list[str] = field(default_factory=list)
failures: list[dict] = field(default_factory=list)
human_approvals: int = 0
status: str = "running"
class MetricsDashboard:
"""Aggregated metrics across all workflows."""
def __init__(self):
self.workflows: list[WorkflowMetrics] = []
def record(self, metrics: WorkflowMetrics):
self.workflows.append(metrics)
def get_summary(self, last_n_hours: int = 24) -> dict:
cutoff = time.time() - (last_n_hours * 3600)
recent = [w for w in self.workflows if w.start_time > cutoff]
if not recent:
return {"message": "No workflows in the last {last_n_hours} hours"}
completed = [w for w in recent if w.status == "completed"]
failed = [w for w in recent if w.status in ("failed", "aborted")]
return {
"total_workflows": len(recent),
"success_rate": len(completed) / len(recent) if recent else 0,
"avg_duration_seconds": sum(w.end_time - w.start_time for w in completed) / len(completed) if completed else 0,
"avg_cost_usd": sum(w.total_cost_usd for w in recent) / len(recent),
"total_cost_usd": sum(w.total_cost_usd for w in recent),
"avg_llm_calls": sum(w.total_llm_calls for w in recent) / len(recent),
"failure_reasons": [f["reason"] for w in failed for f in w.failures],
"most_used_agents": self._count_agents(recent),
}
def _count_agents(self, workflows: list[WorkflowMetrics]) -> dict:
counts = {}
for w in workflows:
for agent in w.agents_used:
counts[agent] = counts.get(agent, 0) + 1
return dict(sorted(counts.items(), key=lambda x: -x[1])[:10])
Key Takeaways
- Every multi-agent system has five failure modes: LLM API errors, infinite loops, budget overruns, tool failures, and quality failures. Build handling for all five.
- Set three budget limits on every workflow: cost (USD), time (seconds), and steps (LLM calls). All three are needed to prevent runaway agents.
- Use human-in-the-loop checkpoints for high-stakes actions. Agents should pause for approval before destructive operations.
- Scale horizontally with worker pools (multiple orchestrators), agent pools (multiple agent instances), or sharded workflows (per-tenant isolation).
- Monitor success rate, cost, latency, and failure rate per agent type. Alert on regressions immediately.
What Is Next
The final lesson brings everything together with a production checklist, debugging guide, and FAQ — including when NOT to use multi-agent systems and the most common mistakes teams make.
Lilly Tech Systems