Advanced

Step 5: Monitoring & Debugging

Multi-agent systems are complex. Without proper observability, debugging becomes impossible. In this lesson, you will integrate LangSmith for tracing, build a cost tracker, and create structured error handling that makes failures transparent and recoverable.

LangSmith Tracing

LangSmith gives you a visual trace of every step in your multi-agent workflow: which agent ran, what tools it called, what the LLM produced, and how long each step took.

Setup

# monitoring/tracing.py
"""LangSmith tracing integration for the multi-agent workflow."""
import os
from functools import wraps
from datetime import datetime
from langsmith import Client
from langsmith.run_trees import RunTree


def setup_tracing():
    """Configure LangSmith tracing via environment variables.

    LangChain and LangGraph automatically send traces to LangSmith
    when these environment variables are set. No code changes needed
    in the agent or workflow files.
    """
    required_vars = {
        "LANGCHAIN_TRACING_V2": "true",
        "LANGCHAIN_API_KEY": os.getenv("LANGCHAIN_API_KEY", ""),
        "LANGCHAIN_PROJECT": os.getenv("LANGCHAIN_PROJECT", "multi-agent-workflow"),
    }

    for key, value in required_vars.items():
        if value:
            os.environ[key] = value

    # Verify connection
    api_key = os.getenv("LANGCHAIN_API_KEY")
    if api_key:
        try:
            client = Client(api_key=api_key)
            # Test the connection
            projects = list(client.list_projects(limit=1))
            print(f"LangSmith connected. Project: {os.getenv('LANGCHAIN_PROJECT')}")
            return True
        except Exception as e:
            print(f"LangSmith connection failed: {e}. Tracing disabled.")
            os.environ["LANGCHAIN_TRACING_V2"] = "false"
            return False
    else:
        print("LANGCHAIN_API_KEY not set. Tracing disabled.")
        return False


def trace_agent_run(agent_name: str):
    """Decorator that adds custom metadata to agent traces.

    Args:
        agent_name: Name of the agent for trace labeling.
    """
    def decorator(func):
        @wraps(func)
        def wrapper(state, *args, **kwargs):
            # Add metadata to the run
            metadata = {
                "agent_name": agent_name,
                "iteration": state.get("iteration", 0),
                "task": state.get("task", "")[:200],
                "timestamp": datetime.now().isoformat(),
            }

            # LangSmith will automatically pick up this metadata
            # through the LangChain callback system
            import langsmith
            with langsmith.trace(
                name=f"{agent_name}_agent",
                metadata=metadata,
                tags=[agent_name, f"iteration-{state.get('iteration', 0)}"]
            ):
                result = func(state, *args, **kwargs)
            return result
        return wrapper
    return decorator
💡
Zero-code tracing. Once you set the three environment variables (LANGCHAIN_TRACING_V2, LANGCHAIN_API_KEY, LANGCHAIN_PROJECT), LangGraph automatically sends traces to LangSmith. The @trace_agent_run decorator adds custom metadata for better filtering and search.

Cost Tracking

Every LLM call costs money. Track token usage and costs per agent, per run, and per workflow:

# monitoring/cost_tracker.py
"""Token usage and cost tracking for the multi-agent workflow."""
import time
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime


# OpenAI pricing per 1M tokens (as of 2024)
MODEL_PRICING = {
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4-turbo": {"input": 10.00, "output": 30.00},
    "gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
}


@dataclass
class AgentUsage:
    """Token usage for a single agent run."""
    agent_name: str
    model: str
    input_tokens: int = 0
    output_tokens: int = 0
    tool_calls: int = 0
    duration_seconds: float = 0.0
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())

    @property
    def total_tokens(self) -> int:
        return self.input_tokens + self.output_tokens

    @property
    def cost_usd(self) -> float:
        pricing = MODEL_PRICING.get(self.model, {"input": 0.15, "output": 0.60})
        input_cost = (self.input_tokens / 1_000_000) * pricing["input"]
        output_cost = (self.output_tokens / 1_000_000) * pricing["output"]
        return input_cost + output_cost


class CostTracker:
    """Track costs across an entire workflow run.

    Usage:
        tracker = CostTracker()
        tracker.record("researcher", "gpt-4o-mini", 500, 200, tool_calls=2, duration=1.5)
        tracker.record("coder", "gpt-4o-mini", 800, 400, tool_calls=3, duration=2.1)
        print(tracker.summary())
    """

    def __init__(self):
        self.runs: list[AgentUsage] = []
        self.start_time: float = time.time()

    def record(
        self,
        agent_name: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        tool_calls: int = 0,
        duration: float = 0.0,
    ):
        """Record token usage for an agent run."""
        usage = AgentUsage(
            agent_name=agent_name,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            tool_calls=tool_calls,
            duration_seconds=duration,
        )
        self.runs.append(usage)

    @property
    def total_cost(self) -> float:
        return sum(r.cost_usd for r in self.runs)

    @property
    def total_tokens(self) -> int:
        return sum(r.total_tokens for r in self.runs)

    @property
    def total_duration(self) -> float:
        return time.time() - self.start_time

    def summary(self) -> str:
        """Generate a human-readable cost summary."""
        lines = [
            "=" * 60,
            "WORKFLOW COST SUMMARY",
            "=" * 60,
            f"Total cost:     ${self.total_cost:.6f}",
            f"Total tokens:   {self.total_tokens:,}",
            f"Total duration: {self.total_duration:.1f}s",
            f"Agent runs:     {len(self.runs)}",
            "-" * 60,
        ]

        # Per-agent breakdown
        agent_costs = {}
        for run in self.runs:
            if run.agent_name not in agent_costs:
                agent_costs[run.agent_name] = {
                    "cost": 0, "tokens": 0, "calls": 0, "tool_calls": 0
                }
            agent_costs[run.agent_name]["cost"] += run.cost_usd
            agent_costs[run.agent_name]["tokens"] += run.total_tokens
            agent_costs[run.agent_name]["calls"] += 1
            agent_costs[run.agent_name]["tool_calls"] += run.tool_calls

        for agent, data in sorted(agent_costs.items()):
            lines.append(
                f"  {agent:15s}  ${data['cost']:.6f}  "
                f"{data['tokens']:>8,} tokens  "
                f"{data['calls']} runs  "
                f"{data['tool_calls']} tool calls"
            )

        lines.append("=" * 60)
        return "\n".join(lines)

    def to_dict(self) -> dict:
        """Export as a dictionary for JSON serialization."""
        return {
            "total_cost_usd": self.total_cost,
            "total_tokens": self.total_tokens,
            "total_duration_seconds": self.total_duration,
            "runs": [
                {
                    "agent": r.agent_name,
                    "model": r.model,
                    "input_tokens": r.input_tokens,
                    "output_tokens": r.output_tokens,
                    "cost_usd": r.cost_usd,
                    "tool_calls": r.tool_calls,
                    "duration_seconds": r.duration_seconds,
                }
                for r in self.runs
            ],
        }

Integrating the Cost Tracker

Add cost tracking to agent nodes using a LangChain callback:

# monitoring/callbacks.py
"""LangChain callbacks for cost tracking."""
from langchain_core.callbacks import BaseCallbackHandler
from monitoring.cost_tracker import CostTracker


class CostTrackingCallback(BaseCallbackHandler):
    """Callback that records token usage for every LLM call."""

    def __init__(self, tracker: CostTracker, agent_name: str):
        self.tracker = tracker
        self.agent_name = agent_name
        self._start_time = None

    def on_llm_start(self, serialized, prompts, **kwargs):
        import time
        self._start_time = time.time()

    def on_llm_end(self, response, **kwargs):
        import time
        duration = time.time() - self._start_time if self._start_time else 0

        # Extract token usage from the response
        usage = response.llm_output.get("token_usage", {}) if response.llm_output else {}
        input_tokens = usage.get("prompt_tokens", 0)
        output_tokens = usage.get("completion_tokens", 0)

        # Count tool calls in the response
        tool_calls = 0
        for gen in response.generations:
            for g in gen:
                if hasattr(g, "message") and hasattr(g.message, "tool_calls"):
                    tool_calls += len(g.message.tool_calls)

        model = ""
        if response.llm_output:
            model = response.llm_output.get("model_name", "gpt-4o-mini")

        self.tracker.record(
            agent_name=self.agent_name,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            tool_calls=tool_calls,
            duration=duration,
        )


# Usage in agent creation:
# tracker = CostTracker()
# callback = CostTrackingCallback(tracker, "researcher")
# llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[callback])

Structured Error Handling

Build an error handler that categorizes failures and provides recovery strategies:

# monitoring/error_handler.py
"""Structured error handling for multi-agent workflows."""
import traceback
import logging
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable
from functools import wraps

logger = logging.getLogger(__name__)


class ErrorSeverity(Enum):
    LOW = "low"          # Retry will likely fix it
    MEDIUM = "medium"    # Needs attention but not critical
    HIGH = "high"        # Workflow may need to abort
    CRITICAL = "critical"  # Immediate human intervention needed


class ErrorCategory(Enum):
    API_ERROR = "api_error"           # OpenAI, Tavily API failures
    TOOL_ERROR = "tool_error"         # Tool execution failures
    ROUTING_ERROR = "routing_error"   # Supervisor routing issues
    TIMEOUT = "timeout"               # Operation timed out
    STATE_ERROR = "state_error"       # Invalid state transitions
    RATE_LIMIT = "rate_limit"         # API rate limiting
    AUTH_ERROR = "auth_error"         # Authentication failures


@dataclass
class AgentError:
    """Structured error with context for debugging."""
    category: ErrorCategory
    severity: ErrorSeverity
    agent_name: str
    message: str
    original_error: Optional[Exception] = None
    traceback_str: Optional[str] = None
    recovery_suggestion: str = ""
    iteration: int = 0

    def to_dict(self) -> dict:
        return {
            "category": self.category.value,
            "severity": self.severity.value,
            "agent": self.agent_name,
            "message": self.message,
            "recovery": self.recovery_suggestion,
            "iteration": self.iteration,
        }


def classify_error(error: Exception) -> tuple[ErrorCategory, ErrorSeverity, str]:
    """Classify an exception into a category with recovery suggestion."""
    error_str = str(error).lower()
    error_type = type(error).__name__

    # Rate limiting
    if "rate_limit" in error_str or "429" in error_str:
        return (
            ErrorCategory.RATE_LIMIT,
            ErrorSeverity.LOW,
            "Wait 30-60 seconds and retry. Consider using a lower-tier model."
        )

    # Authentication
    if "auth" in error_str or "api_key" in error_str or "401" in error_str:
        return (
            ErrorCategory.AUTH_ERROR,
            ErrorSeverity.CRITICAL,
            "Check your API keys in the .env file."
        )

    # Timeout
    if "timeout" in error_str or "timed out" in error_str:
        return (
            ErrorCategory.TIMEOUT,
            ErrorSeverity.MEDIUM,
            "Increase the timeout or break the task into smaller steps."
        )

    # API errors
    if "openai" in error_type.lower() or "api" in error_str:
        return (
            ErrorCategory.API_ERROR,
            ErrorSeverity.MEDIUM,
            "Retry the request. If persistent, check the API status page."
        )

    # Default
    return (
        ErrorCategory.TOOL_ERROR,
        ErrorSeverity.MEDIUM,
        "Check the tool input parameters and retry."
    )


def with_error_handling(agent_name: str, max_retries: int = 2):
    """Decorator that adds structured error handling to agent nodes.

    Args:
        agent_name: Name of the agent for error context.
        max_retries: Maximum number of retries before failing.
    """
    def decorator(func):
        @wraps(func)
        def wrapper(state, *args, **kwargs):
            last_error = None

            for attempt in range(max_retries + 1):
                try:
                    return func(state, *args, **kwargs)
                except Exception as e:
                    last_error = e
                    category, severity, recovery = classify_error(e)

                    error = AgentError(
                        category=category,
                        severity=severity,
                        agent_name=agent_name,
                        message=str(e),
                        original_error=e,
                        traceback_str=traceback.format_exc(),
                        recovery_suggestion=recovery,
                        iteration=state.get("iteration", 0),
                    )

                    logger.error(
                        f"[{agent_name}] Attempt {attempt + 1}/{max_retries + 1} failed: "
                        f"{error.category.value} ({error.severity.value}): {error.message}"
                    )

                    # Do not retry critical errors
                    if severity == ErrorSeverity.CRITICAL:
                        break

                    # Wait before retrying (exponential backoff)
                    if attempt < max_retries:
                        import time
                        time.sleep(2 ** attempt)

            # All retries exhausted
            from langchain_core.messages import AIMessage
            return {
                "messages": [AIMessage(content=(
                    f"Agent '{agent_name}' failed after {max_retries + 1} attempts. "
                    f"Error: {str(last_error)}. "
                    f"Suggestion: {recovery}"
                ))],
                "status": "error",
                "results": state.get("results", {}),
            }

        return wrapper
    return decorator

Apply Error Handling to Agents

# Update agent nodes with error handling decorators:

# agents/researcher.py - add at the top:
from monitoring.error_handler import with_error_handling

@with_error_handling("researcher", max_retries=2)
def researcher_node(state: AgentState) -> dict:
    # ... existing code ...

# agents/coder.py:
@with_error_handling("coder", max_retries=1)  # Fewer retries for code execution
def coder_node(state: AgentState) -> dict:
    # ... existing code ...

# agents/analyst.py:
@with_error_handling("analyst", max_retries=2)
def analyst_node(state: AgentState) -> dict:
    # ... existing code ...

Debug Utilities

Helper functions to inspect workflow state and diagnose issues:

# monitoring/debug.py
"""Debugging utilities for multi-agent workflows."""
from rich.console import Console
from rich.table import Table
from rich.tree import Tree
from agents.state import AgentState

console = Console()


def print_state(state: AgentState, title: str = "Current State"):
    """Pretty-print the current workflow state."""
    table = Table(title=title)
    table.add_column("Field", style="cyan")
    table.add_column("Value", style="white")

    table.add_row("Status", state.get("status", "unknown"))
    table.add_row("Next Agent", state.get("next_agent", "none"))
    table.add_row("Iteration", str(state.get("iteration", 0)))
    table.add_row("Task", state.get("task", "")[:100])
    table.add_row("Messages", str(len(state.get("messages", []))))
    table.add_row("Results", ", ".join(state.get("results", {}).keys()) or "none")

    console.print(table)


def print_message_history(state: AgentState):
    """Print the full message history as a tree."""
    tree = Tree("Message History")

    for msg in state.get("messages", []):
        msg_type = type(msg).__name__
        content = msg.content[:150] + "..." if len(msg.content) > 150 else msg.content

        if msg_type == "HumanMessage":
            tree.add(f"[green]Human:[/green] {content}")
        elif msg_type == "AIMessage":
            tree.add(f"[blue]AI:[/blue] {content}")
        elif msg_type == "ToolMessage":
            tree.add(f"[yellow]Tool:[/yellow] {content}")
        else:
            tree.add(f"[dim]{msg_type}:[/dim] {content}")

    console.print(tree)


def stream_workflow_debug(task: str):
    """Run the workflow with step-by-step debug output.

    This streams each node execution so you can see the workflow
    progress in real time.
    """
    from graph.workflow import build_workflow

    app = build_workflow()

    initial_state = {
        "messages": [],
        "next_agent": "",
        "task": task,
        "results": {},
        "status": "in_progress",
        "iteration": 0,
    }

    console.print(f"\n[bold]Debug run: {task}[/bold]\n")

    # Stream each step
    for step in app.stream(initial_state):
        for node_name, node_output in step.items():
            console.print(f"\n[cyan]--- Node: {node_name} ---[/cyan]")

            if "next_agent" in node_output:
                console.print(f"  Next: {node_output['next_agent']}")
            if "status" in node_output:
                console.print(f"  Status: {node_output['status']}")
            if "messages" in node_output:
                for msg in node_output["messages"]:
                    content = msg.content[:200]
                    console.print(f"  Message: {content}")

    console.print("\n[green]Workflow complete.[/green]")
📝
Checkpoint: With monitoring in place, you should be able to see every agent step in LangSmith, track costs per run, and get clear error messages when things fail. Run stream_workflow_debug("your task") to see the workflow execute step by step in your terminal.

Key Takeaways

  • LangSmith tracing is enabled with three environment variables — no code changes needed in agents or workflows.
  • Cost tracking per agent helps identify which agents consume the most tokens and where to optimize.
  • Structured error handling with retry logic, error classification, and recovery suggestions makes failures recoverable.
  • Debug utilities like stream_workflow_debug and print_state make it easy to inspect workflow behavior step by step.
  • Logging every agent run with metadata (agent name, iteration, duration) creates an audit trail for production debugging.

What Is Next

In the final lesson, you will explore enhancements and best practices — parallel agent execution, streaming output, deployment strategies, scaling patterns, and a comprehensive FAQ.