Step 5: Monitoring & Debugging
Multi-agent systems are complex. Without proper observability, debugging becomes impossible. In this lesson, you will integrate LangSmith for tracing, build a cost tracker, and create structured error handling that makes failures transparent and recoverable.
LangSmith Tracing
LangSmith gives you a visual trace of every step in your multi-agent workflow: which agent ran, what tools it called, what the LLM produced, and how long each step took.
Setup
# monitoring/tracing.py
"""LangSmith tracing integration for the multi-agent workflow."""
import os
from functools import wraps
from datetime import datetime
from langsmith import Client
from langsmith.run_trees import RunTree
def setup_tracing():
"""Configure LangSmith tracing via environment variables.
LangChain and LangGraph automatically send traces to LangSmith
when these environment variables are set. No code changes needed
in the agent or workflow files.
"""
required_vars = {
"LANGCHAIN_TRACING_V2": "true",
"LANGCHAIN_API_KEY": os.getenv("LANGCHAIN_API_KEY", ""),
"LANGCHAIN_PROJECT": os.getenv("LANGCHAIN_PROJECT", "multi-agent-workflow"),
}
for key, value in required_vars.items():
if value:
os.environ[key] = value
# Verify connection
api_key = os.getenv("LANGCHAIN_API_KEY")
if api_key:
try:
client = Client(api_key=api_key)
# Test the connection
projects = list(client.list_projects(limit=1))
print(f"LangSmith connected. Project: {os.getenv('LANGCHAIN_PROJECT')}")
return True
except Exception as e:
print(f"LangSmith connection failed: {e}. Tracing disabled.")
os.environ["LANGCHAIN_TRACING_V2"] = "false"
return False
else:
print("LANGCHAIN_API_KEY not set. Tracing disabled.")
return False
def trace_agent_run(agent_name: str):
"""Decorator that adds custom metadata to agent traces.
Args:
agent_name: Name of the agent for trace labeling.
"""
def decorator(func):
@wraps(func)
def wrapper(state, *args, **kwargs):
# Add metadata to the run
metadata = {
"agent_name": agent_name,
"iteration": state.get("iteration", 0),
"task": state.get("task", "")[:200],
"timestamp": datetime.now().isoformat(),
}
# LangSmith will automatically pick up this metadata
# through the LangChain callback system
import langsmith
with langsmith.trace(
name=f"{agent_name}_agent",
metadata=metadata,
tags=[agent_name, f"iteration-{state.get('iteration', 0)}"]
):
result = func(state, *args, **kwargs)
return result
return wrapper
return decorator
LANGCHAIN_TRACING_V2, LANGCHAIN_API_KEY, LANGCHAIN_PROJECT), LangGraph automatically sends traces to LangSmith. The @trace_agent_run decorator adds custom metadata for better filtering and search.Cost Tracking
Every LLM call costs money. Track token usage and costs per agent, per run, and per workflow:
# monitoring/cost_tracker.py
"""Token usage and cost tracking for the multi-agent workflow."""
import time
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
# OpenAI pricing per 1M tokens (as of 2024)
MODEL_PRICING = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
}
@dataclass
class AgentUsage:
"""Token usage for a single agent run."""
agent_name: str
model: str
input_tokens: int = 0
output_tokens: int = 0
tool_calls: int = 0
duration_seconds: float = 0.0
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
@property
def cost_usd(self) -> float:
pricing = MODEL_PRICING.get(self.model, {"input": 0.15, "output": 0.60})
input_cost = (self.input_tokens / 1_000_000) * pricing["input"]
output_cost = (self.output_tokens / 1_000_000) * pricing["output"]
return input_cost + output_cost
class CostTracker:
"""Track costs across an entire workflow run.
Usage:
tracker = CostTracker()
tracker.record("researcher", "gpt-4o-mini", 500, 200, tool_calls=2, duration=1.5)
tracker.record("coder", "gpt-4o-mini", 800, 400, tool_calls=3, duration=2.1)
print(tracker.summary())
"""
def __init__(self):
self.runs: list[AgentUsage] = []
self.start_time: float = time.time()
def record(
self,
agent_name: str,
model: str,
input_tokens: int,
output_tokens: int,
tool_calls: int = 0,
duration: float = 0.0,
):
"""Record token usage for an agent run."""
usage = AgentUsage(
agent_name=agent_name,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
tool_calls=tool_calls,
duration_seconds=duration,
)
self.runs.append(usage)
@property
def total_cost(self) -> float:
return sum(r.cost_usd for r in self.runs)
@property
def total_tokens(self) -> int:
return sum(r.total_tokens for r in self.runs)
@property
def total_duration(self) -> float:
return time.time() - self.start_time
def summary(self) -> str:
"""Generate a human-readable cost summary."""
lines = [
"=" * 60,
"WORKFLOW COST SUMMARY",
"=" * 60,
f"Total cost: ${self.total_cost:.6f}",
f"Total tokens: {self.total_tokens:,}",
f"Total duration: {self.total_duration:.1f}s",
f"Agent runs: {len(self.runs)}",
"-" * 60,
]
# Per-agent breakdown
agent_costs = {}
for run in self.runs:
if run.agent_name not in agent_costs:
agent_costs[run.agent_name] = {
"cost": 0, "tokens": 0, "calls": 0, "tool_calls": 0
}
agent_costs[run.agent_name]["cost"] += run.cost_usd
agent_costs[run.agent_name]["tokens"] += run.total_tokens
agent_costs[run.agent_name]["calls"] += 1
agent_costs[run.agent_name]["tool_calls"] += run.tool_calls
for agent, data in sorted(agent_costs.items()):
lines.append(
f" {agent:15s} ${data['cost']:.6f} "
f"{data['tokens']:>8,} tokens "
f"{data['calls']} runs "
f"{data['tool_calls']} tool calls"
)
lines.append("=" * 60)
return "\n".join(lines)
def to_dict(self) -> dict:
"""Export as a dictionary for JSON serialization."""
return {
"total_cost_usd": self.total_cost,
"total_tokens": self.total_tokens,
"total_duration_seconds": self.total_duration,
"runs": [
{
"agent": r.agent_name,
"model": r.model,
"input_tokens": r.input_tokens,
"output_tokens": r.output_tokens,
"cost_usd": r.cost_usd,
"tool_calls": r.tool_calls,
"duration_seconds": r.duration_seconds,
}
for r in self.runs
],
}
Integrating the Cost Tracker
Add cost tracking to agent nodes using a LangChain callback:
# monitoring/callbacks.py
"""LangChain callbacks for cost tracking."""
from langchain_core.callbacks import BaseCallbackHandler
from monitoring.cost_tracker import CostTracker
class CostTrackingCallback(BaseCallbackHandler):
"""Callback that records token usage for every LLM call."""
def __init__(self, tracker: CostTracker, agent_name: str):
self.tracker = tracker
self.agent_name = agent_name
self._start_time = None
def on_llm_start(self, serialized, prompts, **kwargs):
import time
self._start_time = time.time()
def on_llm_end(self, response, **kwargs):
import time
duration = time.time() - self._start_time if self._start_time else 0
# Extract token usage from the response
usage = response.llm_output.get("token_usage", {}) if response.llm_output else {}
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
# Count tool calls in the response
tool_calls = 0
for gen in response.generations:
for g in gen:
if hasattr(g, "message") and hasattr(g.message, "tool_calls"):
tool_calls += len(g.message.tool_calls)
model = ""
if response.llm_output:
model = response.llm_output.get("model_name", "gpt-4o-mini")
self.tracker.record(
agent_name=self.agent_name,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
tool_calls=tool_calls,
duration=duration,
)
# Usage in agent creation:
# tracker = CostTracker()
# callback = CostTrackingCallback(tracker, "researcher")
# llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[callback])
Structured Error Handling
Build an error handler that categorizes failures and provides recovery strategies:
# monitoring/error_handler.py
"""Structured error handling for multi-agent workflows."""
import traceback
import logging
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable
from functools import wraps
logger = logging.getLogger(__name__)
class ErrorSeverity(Enum):
LOW = "low" # Retry will likely fix it
MEDIUM = "medium" # Needs attention but not critical
HIGH = "high" # Workflow may need to abort
CRITICAL = "critical" # Immediate human intervention needed
class ErrorCategory(Enum):
API_ERROR = "api_error" # OpenAI, Tavily API failures
TOOL_ERROR = "tool_error" # Tool execution failures
ROUTING_ERROR = "routing_error" # Supervisor routing issues
TIMEOUT = "timeout" # Operation timed out
STATE_ERROR = "state_error" # Invalid state transitions
RATE_LIMIT = "rate_limit" # API rate limiting
AUTH_ERROR = "auth_error" # Authentication failures
@dataclass
class AgentError:
"""Structured error with context for debugging."""
category: ErrorCategory
severity: ErrorSeverity
agent_name: str
message: str
original_error: Optional[Exception] = None
traceback_str: Optional[str] = None
recovery_suggestion: str = ""
iteration: int = 0
def to_dict(self) -> dict:
return {
"category": self.category.value,
"severity": self.severity.value,
"agent": self.agent_name,
"message": self.message,
"recovery": self.recovery_suggestion,
"iteration": self.iteration,
}
def classify_error(error: Exception) -> tuple[ErrorCategory, ErrorSeverity, str]:
"""Classify an exception into a category with recovery suggestion."""
error_str = str(error).lower()
error_type = type(error).__name__
# Rate limiting
if "rate_limit" in error_str or "429" in error_str:
return (
ErrorCategory.RATE_LIMIT,
ErrorSeverity.LOW,
"Wait 30-60 seconds and retry. Consider using a lower-tier model."
)
# Authentication
if "auth" in error_str or "api_key" in error_str or "401" in error_str:
return (
ErrorCategory.AUTH_ERROR,
ErrorSeverity.CRITICAL,
"Check your API keys in the .env file."
)
# Timeout
if "timeout" in error_str or "timed out" in error_str:
return (
ErrorCategory.TIMEOUT,
ErrorSeverity.MEDIUM,
"Increase the timeout or break the task into smaller steps."
)
# API errors
if "openai" in error_type.lower() or "api" in error_str:
return (
ErrorCategory.API_ERROR,
ErrorSeverity.MEDIUM,
"Retry the request. If persistent, check the API status page."
)
# Default
return (
ErrorCategory.TOOL_ERROR,
ErrorSeverity.MEDIUM,
"Check the tool input parameters and retry."
)
def with_error_handling(agent_name: str, max_retries: int = 2):
"""Decorator that adds structured error handling to agent nodes.
Args:
agent_name: Name of the agent for error context.
max_retries: Maximum number of retries before failing.
"""
def decorator(func):
@wraps(func)
def wrapper(state, *args, **kwargs):
last_error = None
for attempt in range(max_retries + 1):
try:
return func(state, *args, **kwargs)
except Exception as e:
last_error = e
category, severity, recovery = classify_error(e)
error = AgentError(
category=category,
severity=severity,
agent_name=agent_name,
message=str(e),
original_error=e,
traceback_str=traceback.format_exc(),
recovery_suggestion=recovery,
iteration=state.get("iteration", 0),
)
logger.error(
f"[{agent_name}] Attempt {attempt + 1}/{max_retries + 1} failed: "
f"{error.category.value} ({error.severity.value}): {error.message}"
)
# Do not retry critical errors
if severity == ErrorSeverity.CRITICAL:
break
# Wait before retrying (exponential backoff)
if attempt < max_retries:
import time
time.sleep(2 ** attempt)
# All retries exhausted
from langchain_core.messages import AIMessage
return {
"messages": [AIMessage(content=(
f"Agent '{agent_name}' failed after {max_retries + 1} attempts. "
f"Error: {str(last_error)}. "
f"Suggestion: {recovery}"
))],
"status": "error",
"results": state.get("results", {}),
}
return wrapper
return decorator
Apply Error Handling to Agents
# Update agent nodes with error handling decorators:
# agents/researcher.py - add at the top:
from monitoring.error_handler import with_error_handling
@with_error_handling("researcher", max_retries=2)
def researcher_node(state: AgentState) -> dict:
# ... existing code ...
# agents/coder.py:
@with_error_handling("coder", max_retries=1) # Fewer retries for code execution
def coder_node(state: AgentState) -> dict:
# ... existing code ...
# agents/analyst.py:
@with_error_handling("analyst", max_retries=2)
def analyst_node(state: AgentState) -> dict:
# ... existing code ...
Debug Utilities
Helper functions to inspect workflow state and diagnose issues:
# monitoring/debug.py
"""Debugging utilities for multi-agent workflows."""
from rich.console import Console
from rich.table import Table
from rich.tree import Tree
from agents.state import AgentState
console = Console()
def print_state(state: AgentState, title: str = "Current State"):
"""Pretty-print the current workflow state."""
table = Table(title=title)
table.add_column("Field", style="cyan")
table.add_column("Value", style="white")
table.add_row("Status", state.get("status", "unknown"))
table.add_row("Next Agent", state.get("next_agent", "none"))
table.add_row("Iteration", str(state.get("iteration", 0)))
table.add_row("Task", state.get("task", "")[:100])
table.add_row("Messages", str(len(state.get("messages", []))))
table.add_row("Results", ", ".join(state.get("results", {}).keys()) or "none")
console.print(table)
def print_message_history(state: AgentState):
"""Print the full message history as a tree."""
tree = Tree("Message History")
for msg in state.get("messages", []):
msg_type = type(msg).__name__
content = msg.content[:150] + "..." if len(msg.content) > 150 else msg.content
if msg_type == "HumanMessage":
tree.add(f"[green]Human:[/green] {content}")
elif msg_type == "AIMessage":
tree.add(f"[blue]AI:[/blue] {content}")
elif msg_type == "ToolMessage":
tree.add(f"[yellow]Tool:[/yellow] {content}")
else:
tree.add(f"[dim]{msg_type}:[/dim] {content}")
console.print(tree)
def stream_workflow_debug(task: str):
"""Run the workflow with step-by-step debug output.
This streams each node execution so you can see the workflow
progress in real time.
"""
from graph.workflow import build_workflow
app = build_workflow()
initial_state = {
"messages": [],
"next_agent": "",
"task": task,
"results": {},
"status": "in_progress",
"iteration": 0,
}
console.print(f"\n[bold]Debug run: {task}[/bold]\n")
# Stream each step
for step in app.stream(initial_state):
for node_name, node_output in step.items():
console.print(f"\n[cyan]--- Node: {node_name} ---[/cyan]")
if "next_agent" in node_output:
console.print(f" Next: {node_output['next_agent']}")
if "status" in node_output:
console.print(f" Status: {node_output['status']}")
if "messages" in node_output:
for msg in node_output["messages"]:
content = msg.content[:200]
console.print(f" Message: {content}")
console.print("\n[green]Workflow complete.[/green]")
stream_workflow_debug("your task") to see the workflow execute step by step in your terminal.Key Takeaways
- LangSmith tracing is enabled with three environment variables — no code changes needed in agents or workflows.
- Cost tracking per agent helps identify which agents consume the most tokens and where to optimize.
- Structured error handling with retry logic, error classification, and recovery suggestions makes failures recoverable.
- Debug utilities like
stream_workflow_debugandprint_statemake it easy to inspect workflow behavior step by step. - Logging every agent run with metadata (agent name, iteration, duration) creates an audit trail for production debugging.
What Is Next
In the final lesson, you will explore enhancements and best practices — parallel agent execution, streaming output, deployment strategies, scaling patterns, and a comprehensive FAQ.