Advanced

Reliability & Scaling

Multi-agent systems have more failure modes than single-agent systems. An agent can fail, get stuck in a loop, exceed its budget, or produce results that conflict with other agents. This lesson covers how to handle every failure mode, implement human oversight, and scale multi-agent workflows for production traffic.

Agent Failure Handling

Every agent in a multi-agent system can fail. The question is not whether failures will happen, but how your system responds. Here are the five failure modes and their handling strategies:

Failure ModeSymptomDetectionRecovery
LLM API error 429/500/timeout from provider HTTP status code Exponential backoff with jitter, failover to backup model
Infinite loop Agent repeats same tool call Loop detection (3+ identical calls) Inject "you are looping" message, reduce temperature, force different action
Budget exceeded Cost or step count over limit Cost tracker / step counter Graceful shutdown with partial result, notify supervisor
Tool failure Tool returns error or times out Exception handling + timeout Retry with backoff, try alternative tool, report to agent for replanning
Quality failure Output does not meet quality bar Validation against output contract Retry with feedback, escalate to reviewer agent, fall back to human
import asyncio
import time
import random

class ResilientOrchestrator:
    """Orchestrator with comprehensive failure handling."""

    def __init__(self, agents: dict, max_workflow_cost: float = 5.0,
                 max_workflow_time: int = 300):
        self.agents = agents
        self.max_cost = max_workflow_cost
        self.max_time = max_workflow_time  # seconds
        self.total_cost = 0.0
        self.start_time = None

    async def call_agent_with_resilience(self, agent_name: str, task: str,
                                          max_retries: int = 3) -> dict:
        """Call an agent with retry, timeout, and budget enforcement."""
        agent = self.agents[agent_name]

        for attempt in range(max_retries):
            # Check budget
            if self.total_cost >= self.max_cost:
                return {"status": "budget_exceeded",
                        "message": f"Workflow budget ${self.max_cost} exceeded"}

            # Check time
            elapsed = time.time() - self.start_time
            if elapsed >= self.max_time:
                return {"status": "timeout",
                        "message": f"Workflow time limit {self.max_time}s exceeded"}

            try:
                # Run agent with timeout
                remaining_time = self.max_time - elapsed
                result = await asyncio.wait_for(
                    agent.run(task),
                    timeout=min(60, remaining_time)  # Per-agent timeout of 60s
                )

                # Estimate and track cost
                tokens_used = sum(len(str(m)) for m in result.messages) / 4
                cost = tokens_used * 0.00001  # Rough estimate
                self.total_cost += cost

                if result.status == "completed":
                    return {"status": "success", "result": result, "cost": cost}

                # Agent failed but did not error - retry with different approach
                task = f"Previous attempt failed. Try a different approach.\n\nOriginal task: {task}"

            except asyncio.TimeoutError:
                if attempt < max_retries - 1:
                    continue
                return {"status": "timeout", "message": f"Agent '{agent_name}' timed out"}

            except Exception as e:
                if attempt < max_retries - 1:
                    wait = (2 ** attempt) + random.uniform(0, 1)  # Exponential backoff
                    await asyncio.sleep(wait)
                    continue
                return {"status": "error", "message": str(e)}

        return {"status": "max_retries", "message": f"Agent '{agent_name}' failed after {max_retries} attempts"}

    async def run_workflow(self, tasks: list[dict]) -> dict:
        """Execute a workflow with full resilience."""
        self.start_time = time.time()
        self.total_cost = 0.0
        results = []

        for task_spec in tasks:
            result = await self.call_agent_with_resilience(
                task_spec["agent"], task_spec["instruction"]
            )
            results.append({"task": task_spec, "result": result})

            # Stop workflow on critical failures
            if result["status"] in ("budget_exceeded", "timeout"):
                return {
                    "workflow_status": "aborted",
                    "reason": result["status"],
                    "completed_tasks": len([r for r in results if r["result"]["status"] == "success"]),
                    "total_tasks": len(tasks),
                    "total_cost": self.total_cost,
                    "results": results,
                }

        return {
            "workflow_status": "completed",
            "total_cost": self.total_cost,
            "duration_seconds": round(time.time() - self.start_time, 1),
            "results": results,
        }
💡
Apply at work: Set three budget limits for every workflow: cost (USD), time (seconds), and steps (LLM calls). Without all three, an agent can silently drain your API budget. A typical starting point is $5 per workflow, 5 minutes, and 50 LLM calls. Adjust based on your use case.

Human-in-the-Loop Checkpoints

For high-stakes operations, agents should pause and request human approval before proceeding. This is especially important for destructive actions (file deletion, deployments, sending emails).

import asyncio
from enum import Enum
from dataclasses import dataclass

class ApprovalStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"
    MODIFIED = "modified"

@dataclass
class ApprovalRequest:
    id: str
    workflow_id: str
    agent_name: str
    action: str                    # What the agent wants to do
    details: str                   # Context for the human reviewer
    risk_level: str                # low, medium, high, critical
    status: ApprovalStatus = ApprovalStatus.PENDING
    reviewer_comment: str = None
    modified_action: str = None    # If reviewer modified the action

class HumanCheckpoint:
    """Manages human-in-the-loop approval for agent actions."""

    def __init__(self, approval_callback=None):
        self.pending: dict[str, ApprovalRequest] = {}
        self.approval_callback = approval_callback  # e.g., send Slack message

    async def request_approval(self, request: ApprovalRequest,
                                timeout_seconds: int = 300) -> ApprovalRequest:
        """Block the workflow until a human approves, rejects, or modifies."""
        self.pending[request.id] = request

        # Notify human (Slack, email, dashboard, etc.)
        if self.approval_callback:
            await self.approval_callback(request)

        # Wait for approval
        start = asyncio.get_event_loop().time()
        while request.status == ApprovalStatus.PENDING:
            if asyncio.get_event_loop().time() - start > timeout_seconds:
                request.status = ApprovalStatus.REJECTED
                request.reviewer_comment = "Auto-rejected: approval timeout"
                break
            await asyncio.sleep(1)

        del self.pending[request.id]
        return request

    def approve(self, request_id: str, comment: str = None):
        """Human approves the action."""
        if request_id in self.pending:
            self.pending[request_id].status = ApprovalStatus.APPROVED
            self.pending[request_id].reviewer_comment = comment

    def reject(self, request_id: str, comment: str):
        """Human rejects the action."""
        if request_id in self.pending:
            self.pending[request_id].status = ApprovalStatus.REJECTED
            self.pending[request_id].reviewer_comment = comment

    def modify(self, request_id: str, modified_action: str, comment: str = None):
        """Human approves but modifies the action."""
        if request_id in self.pending:
            self.pending[request_id].status = ApprovalStatus.MODIFIED
            self.pending[request_id].modified_action = modified_action
            self.pending[request_id].reviewer_comment = comment

Horizontal Scaling

When one orchestrator cannot handle your traffic, you need to scale horizontally. Here are the patterns for scaling multi-agent systems:

Scaling PatternHow It WorksWhen to Use
Worker pool Multiple orchestrator instances pull from a shared task queue (Redis, SQS) High throughput, independent workflows
Agent pool Multiple instances of each agent type, orchestrator routes to least-loaded Bottleneck on specific agent types (e.g., coder agents)
Sharded workflows Workflows partitioned by customer/tenant, each shard has its own orchestrator Multi-tenant SaaS, data isolation requirements
import asyncio
from collections import deque

class AgentWorkerPool:
    """Pool of agent workers that processes tasks from a queue."""

    def __init__(self, agent_factory, pool_size: int = 5):
        self.agent_factory = agent_factory
        self.pool_size = pool_size
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.results: dict[str, dict] = {}
        self.workers: list[asyncio.Task] = []
        self.active_count = 0

    async def _worker(self, worker_id: int):
        """Worker loop: pull tasks and execute them."""
        agent = self.agent_factory()
        while True:
            task_id, instruction = await self.task_queue.get()
            self.active_count += 1
            try:
                result = await agent.run(instruction)
                self.results[task_id] = {
                    "status": "completed",
                    "result": result.messages[-1].get("content", ""),
                    "worker_id": worker_id,
                }
            except Exception as e:
                self.results[task_id] = {
                    "status": "error",
                    "error": str(e),
                    "worker_id": worker_id,
                }
            finally:
                self.active_count -= 1
                self.task_queue.task_done()

    async def start(self):
        """Start the worker pool."""
        for i in range(self.pool_size):
            worker = asyncio.create_task(self._worker(i))
            self.workers.append(worker)

    async def submit(self, task_id: str, instruction: str):
        """Submit a task to the pool."""
        await self.task_queue.put((task_id, instruction))

    async def wait_for_result(self, task_id: str, timeout: int = 120) -> dict:
        """Wait for a specific task to complete."""
        start = asyncio.get_event_loop().time()
        while task_id not in self.results:
            if asyncio.get_event_loop().time() - start > timeout:
                return {"status": "timeout"}
            await asyncio.sleep(0.1)
        return self.results.pop(task_id)

    def get_stats(self) -> dict:
        return {
            "pool_size": self.pool_size,
            "queued_tasks": self.task_queue.qsize(),
            "active_workers": self.active_count,
            "idle_workers": self.pool_size - self.active_count,
        }

Monitoring Agent Workflows

You cannot operate what you cannot observe. Every production multi-agent system needs these metrics:

from dataclasses import dataclass, field
import time

@dataclass
class WorkflowMetrics:
    """Metrics for a single workflow execution."""
    workflow_id: str
    start_time: float = field(default_factory=time.time)
    end_time: float = 0
    total_llm_calls: int = 0
    total_tool_calls: int = 0
    total_tokens: int = 0
    total_cost_usd: float = 0
    agents_used: list[str] = field(default_factory=list)
    failures: list[dict] = field(default_factory=list)
    human_approvals: int = 0
    status: str = "running"

class MetricsDashboard:
    """Aggregated metrics across all workflows."""

    def __init__(self):
        self.workflows: list[WorkflowMetrics] = []

    def record(self, metrics: WorkflowMetrics):
        self.workflows.append(metrics)

    def get_summary(self, last_n_hours: int = 24) -> dict:
        cutoff = time.time() - (last_n_hours * 3600)
        recent = [w for w in self.workflows if w.start_time > cutoff]

        if not recent:
            return {"message": "No workflows in the last {last_n_hours} hours"}

        completed = [w for w in recent if w.status == "completed"]
        failed = [w for w in recent if w.status in ("failed", "aborted")]

        return {
            "total_workflows": len(recent),
            "success_rate": len(completed) / len(recent) if recent else 0,
            "avg_duration_seconds": sum(w.end_time - w.start_time for w in completed) / len(completed) if completed else 0,
            "avg_cost_usd": sum(w.total_cost_usd for w in recent) / len(recent),
            "total_cost_usd": sum(w.total_cost_usd for w in recent),
            "avg_llm_calls": sum(w.total_llm_calls for w in recent) / len(recent),
            "failure_reasons": [f["reason"] for w in failed for f in w.failures],
            "most_used_agents": self._count_agents(recent),
        }

    def _count_agents(self, workflows: list[WorkflowMetrics]) -> dict:
        counts = {}
        for w in workflows:
            for agent in w.agents_used:
                counts[agent] = counts.get(agent, 0) + 1
        return dict(sorted(counts.items(), key=lambda x: -x[1])[:10])
📝
Production reality: The four metrics that matter most for multi-agent systems: (1) success rate by workflow type, (2) cost per workflow (p50, p95, p99), (3) latency per workflow, and (4) agent failure rate by agent type. Track these daily and alert on regressions. A 5% drop in success rate usually means a tool broke or an LLM model changed behavior.

Key Takeaways

  • Every multi-agent system has five failure modes: LLM API errors, infinite loops, budget overruns, tool failures, and quality failures. Build handling for all five.
  • Set three budget limits on every workflow: cost (USD), time (seconds), and steps (LLM calls). All three are needed to prevent runaway agents.
  • Use human-in-the-loop checkpoints for high-stakes actions. Agents should pause for approval before destructive operations.
  • Scale horizontally with worker pools (multiple orchestrators), agent pools (multiple agent instances), or sharded workflows (per-tenant isolation).
  • Monitor success rate, cost, latency, and failure rate per agent type. Alert on regressions immediately.

What Is Next

The final lesson brings everything together with a production checklist, debugging guide, and FAQ — including when NOT to use multi-agent systems and the most common mistakes teams make.