LLM-Specific Monitoring Intermediate
LLMs introduce monitoring challenges that traditional ML models don't have: unpredictable token costs, variable latency, hallucinations, prompt injection attacks, and quality that's hard to quantify. This lesson covers production monitoring patterns specifically designed for LLM-powered applications.
LLM Monitoring Dimensions
| Dimension | What to Track | Why It Matters | Alert Threshold |
|---|---|---|---|
| Cost | Tokens per request, cost per query, daily spend | LLM costs can spike 10x overnight from prompt changes | Cost > 2x daily budget |
| Latency | TTFT, total response time, tokens/sec | User experience degrades sharply above 2-3s TTFT | p95 > 5s or TTFT > 2s |
| Quality | Relevance scores, factual accuracy, format compliance | Quality degradation is silent and hard to detect | Quality score < 0.7 |
| Safety | Guardrail trigger rate, toxic content, PII leakage | Compliance and reputation risk | Guardrail rate > 5% |
| Reliability | Error rate, rate limit hits, timeout rate | Provider outages directly impact your product | Error rate > 1% |
Production LLM Request Logger
# Complete LLM request monitoring system
import time
import json
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class LLMRequestLog:
"""Structured log for every LLM API call."""
request_id: str
timestamp: str
model: str
prompt_template: str # Which template was used
prompt_hash: str # Hash of actual prompt (for dedup)
input_tokens: int
output_tokens: int
total_tokens: int
latency_ms: float
ttft_ms: float # Time to first token
cost_usd: float
status: str # success, error, timeout, rate_limited
error_message: Optional[str] = None
quality_score: Optional[float] = None
guardrail_triggered: bool = False
guardrail_type: Optional[str] = None
user_feedback: Optional[str] = None # thumbs_up, thumbs_down, None
class LLMMonitor:
"""Production LLM monitoring system.
Tracks cost, latency, quality, and safety across
all LLM calls in your application.
"""
# Pricing per 1M tokens (update as prices change)
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-haiku": {"input": 0.25, "output": 1.25},
}
def __init__(self):
self.logs: List[LLMRequestLog] = []
self.daily_cost: Dict[str, float] = {}
self.prompt_performance: Dict[str, List[dict]] = {}
def calculate_cost(self, model: str, input_tokens: int,
output_tokens: int) -> float:
"""Calculate cost for an LLM call."""
pricing = self.PRICING.get(model, {"input": 5.0, "output": 15.0})
cost = (input_tokens * pricing["input"] / 1_000_000 +
output_tokens * pricing["output"] / 1_000_000)
return round(cost, 6)
def log_request(self, request_id: str, model: str,
prompt_template: str, prompt_text: str,
input_tokens: int, output_tokens: int,
latency_ms: float, ttft_ms: float,
status: str = "success",
error_message: str = None,
guardrail_triggered: bool = False,
guardrail_type: str = None) -> LLMRequestLog:
"""Log an LLM request with all monitoring dimensions."""
cost = self.calculate_cost(model, input_tokens, output_tokens)
prompt_hash = hashlib.md5(prompt_text.encode()).hexdigest()[:12]
log = LLMRequestLog(
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
model=model,
prompt_template=prompt_template,
prompt_hash=prompt_hash,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=input_tokens + output_tokens,
latency_ms=latency_ms,
ttft_ms=ttft_ms,
cost_usd=cost,
status=status,
error_message=error_message,
guardrail_triggered=guardrail_triggered,
guardrail_type=guardrail_type
)
self.logs.append(log)
# Track daily cost
date_key = datetime.utcnow().strftime("%Y-%m-%d")
self.daily_cost[date_key] = self.daily_cost.get(date_key, 0) + cost
# Track per-prompt performance
if prompt_template not in self.prompt_performance:
self.prompt_performance[prompt_template] = []
self.prompt_performance[prompt_template].append({
"latency_ms": latency_ms,
"tokens": input_tokens + output_tokens,
"cost": cost,
"status": status
})
return log
def get_cost_report(self, hours: int = 24) -> dict:
"""Generate cost report for the last N hours."""
cutoff = datetime.utcnow() - timedelta(hours=hours)
recent = [l for l in self.logs
if datetime.fromisoformat(l.timestamp) > cutoff]
if not recent:
return {"status": "no_data", "period_hours": hours}
total_cost = sum(l.cost_usd for l in recent)
by_model = {}
for log in recent:
if log.model not in by_model:
by_model[log.model] = {"cost": 0, "requests": 0, "tokens": 0}
by_model[log.model]["cost"] += log.cost_usd
by_model[log.model]["requests"] += 1
by_model[log.model]["tokens"] += log.total_tokens
return {
"period_hours": hours,
"total_cost_usd": round(total_cost, 4),
"total_requests": len(recent),
"avg_cost_per_request": round(total_cost / len(recent), 6),
"projected_daily_cost": round(total_cost * 24 / hours, 2),
"by_model": {k: {kk: round(vv, 4) if isinstance(vv, float) else vv
for kk, vv in v.items()}
for k, v in by_model.items()}
}
def get_latency_report(self, hours: int = 1) -> dict:
"""Generate latency report with percentiles."""
cutoff = datetime.utcnow() - timedelta(hours=hours)
recent = [l for l in self.logs
if datetime.fromisoformat(l.timestamp) > cutoff
and l.status == "success"]
if not recent:
return {"status": "no_data"}
import numpy as np
latencies = [l.latency_ms for l in recent]
ttfts = [l.ttft_ms for l in recent]
return {
"period_hours": hours,
"request_count": len(recent),
"latency": {
"p50_ms": round(np.percentile(latencies, 50), 1),
"p95_ms": round(np.percentile(latencies, 95), 1),
"p99_ms": round(np.percentile(latencies, 99), 1),
"mean_ms": round(np.mean(latencies), 1)
},
"time_to_first_token": {
"p50_ms": round(np.percentile(ttfts, 50), 1),
"p95_ms": round(np.percentile(ttfts, 95), 1),
"mean_ms": round(np.mean(ttfts), 1)
}
}
def get_quality_report(self, hours: int = 24) -> dict:
"""Report on quality scores and user feedback."""
cutoff = datetime.utcnow() - timedelta(hours=hours)
recent = [l for l in self.logs
if datetime.fromisoformat(l.timestamp) > cutoff]
total = len(recent)
if total == 0:
return {"status": "no_data"}
guardrail_triggered = sum(1 for l in recent if l.guardrail_triggered)
errors = sum(1 for l in recent if l.status != "success")
thumbs_up = sum(1 for l in recent if l.user_feedback == "thumbs_up")
thumbs_down = sum(1 for l in recent if l.user_feedback == "thumbs_down")
feedback_total = thumbs_up + thumbs_down
return {
"period_hours": hours,
"total_requests": total,
"error_rate": round(errors / total, 4),
"guardrail_trigger_rate": round(guardrail_triggered / total, 4),
"user_satisfaction": round(thumbs_up / feedback_total, 4)
if feedback_total > 0 else None,
"feedback_coverage": round(feedback_total / total, 4)
}
Hallucination Detection
# Hallucination detection strategies for production LLMs
import re
from typing import List, Dict
class HallucinationDetector:
"""Detect potential hallucinations in LLM outputs.
Three detection strategies:
1. Factual grounding: check claims against provided context
2. Self-consistency: ask the same question multiple times
3. Structural validation: check format, citations, numbers
"""
def check_grounding(self, response: str, context: str,
claim_extractor=None) -> dict:
"""Check if response claims are grounded in the provided context.
Used for RAG systems where context is retrieved documents."""
# Simple approach: check if key entities in response
# appear in the context
response_sentences = [s.strip() for s in response.split('.')
if len(s.strip()) > 20]
grounded = []
ungrounded = []
context_lower = context.lower()
for sentence in response_sentences:
# Extract potential factual claims (sentences with numbers,
# dates, names, or specific assertions)
has_specifics = bool(re.search(
r'\d+|January|February|March|April|May|June|July|August|'
r'September|October|November|December|\$|%|according to',
sentence, re.IGNORECASE
))
if has_specifics:
# Check if key terms from the claim appear in context
words = set(sentence.lower().split())
context_words = set(context_lower.split())
overlap = len(words & context_words) / len(words)
if overlap > 0.4:
grounded.append(sentence)
else:
ungrounded.append(sentence)
total_claims = len(grounded) + len(ungrounded)
grounding_score = (len(grounded) / total_claims
if total_claims > 0 else 1.0)
return {
"grounding_score": round(grounding_score, 3),
"total_claims_checked": total_claims,
"grounded_claims": len(grounded),
"potentially_hallucinated": len(ungrounded),
"ungrounded_sentences": ungrounded[:5], # Top 5
"alert": grounding_score < 0.7
}
def check_self_consistency(self, responses: List[str],
similarity_threshold: float = 0.6) -> dict:
"""Check if multiple responses to the same query are consistent.
High inconsistency suggests hallucination.
Generate 3-5 responses with temperature > 0 and compare.
"""
if len(responses) < 2:
return {"status": "need_multiple_responses"}
# Simple word overlap similarity
def word_overlap(a: str, b: str) -> float:
words_a = set(a.lower().split())
words_b = set(b.lower().split())
if not words_a or not words_b:
return 0.0
intersection = words_a & words_b
union = words_a | words_b
return len(intersection) / len(union)
# Compare all pairs
similarities = []
for i in range(len(responses)):
for j in range(i + 1, len(responses)):
sim = word_overlap(responses[i], responses[j])
similarities.append(sim)
avg_similarity = sum(similarities) / len(similarities)
return {
"avg_consistency": round(avg_similarity, 3),
"min_consistency": round(min(similarities), 3),
"num_responses": len(responses),
"likely_hallucination": avg_similarity < similarity_threshold,
"confidence": "high" if len(responses) >= 5 else "medium"
}
def check_format_compliance(self, response: str,
expected_format: dict) -> dict:
"""Validate LLM output matches expected format.
Catches structural hallucinations (wrong JSON, missing fields)."""
issues = []
# Check JSON format if expected
if expected_format.get("type") == "json":
try:
parsed = json.loads(response)
required_fields = expected_format.get("required_fields", [])
for field in required_fields:
if field not in parsed:
issues.append(f"Missing required field: {field}")
except json.JSONDecodeError:
issues.append("Response is not valid JSON")
# Check length constraints
max_length = expected_format.get("max_length")
if max_length and len(response) > max_length:
issues.append(f"Response exceeds max length ({len(response)} > {max_length})")
# Check for forbidden patterns
forbidden = expected_format.get("forbidden_patterns", [])
for pattern in forbidden:
if re.search(pattern, response, re.IGNORECASE):
issues.append(f"Contains forbidden pattern: {pattern}")
return {
"compliant": len(issues) == 0,
"issues": issues,
"issue_count": len(issues)
}
Prompt Performance Tracking
# Track and compare prompt template performance
from collections import defaultdict
import numpy as np
class PromptPerformanceTracker:
"""Track how different prompt templates perform in production.
Helps answer: "Did our prompt change improve things?"
"""
def __init__(self):
self.prompt_metrics = defaultdict(lambda: {
"requests": 0,
"total_tokens": 0,
"total_cost": 0,
"latencies": [],
"quality_scores": [],
"guardrail_triggers": 0,
"errors": 0,
"thumbs_up": 0,
"thumbs_down": 0
})
def log(self, prompt_template: str, tokens: int, cost: float,
latency_ms: float, quality_score: float = None,
guardrail_triggered: bool = False, error: bool = False,
feedback: str = None):
"""Log metrics for a prompt template."""
m = self.prompt_metrics[prompt_template]
m["requests"] += 1
m["total_tokens"] += tokens
m["total_cost"] += cost
m["latencies"].append(latency_ms)
if quality_score is not None:
m["quality_scores"].append(quality_score)
if guardrail_triggered:
m["guardrail_triggers"] += 1
if error:
m["errors"] += 1
if feedback == "thumbs_up":
m["thumbs_up"] += 1
elif feedback == "thumbs_down":
m["thumbs_down"] += 1
def compare_prompts(self, template_a: str, template_b: str) -> dict:
"""Compare two prompt templates head-to-head."""
a = self.prompt_metrics.get(template_a, {})
b = self.prompt_metrics.get(template_b, {})
if not a or not b:
return {"status": "insufficient_data"}
def summarize(metrics):
reqs = metrics["requests"]
fb_total = metrics["thumbs_up"] + metrics["thumbs_down"]
return {
"requests": reqs,
"avg_tokens": round(metrics["total_tokens"] / max(reqs, 1)),
"avg_cost": round(metrics["total_cost"] / max(reqs, 1), 6),
"avg_latency_ms": round(np.mean(metrics["latencies"]), 1)
if metrics["latencies"] else 0,
"p95_latency_ms": round(np.percentile(metrics["latencies"], 95), 1)
if metrics["latencies"] else 0,
"avg_quality": round(np.mean(metrics["quality_scores"]), 3)
if metrics["quality_scores"] else None,
"error_rate": round(metrics["errors"] / max(reqs, 1), 4),
"guardrail_rate": round(metrics["guardrail_triggers"] / max(reqs, 1), 4),
"satisfaction": round(metrics["thumbs_up"] / max(fb_total, 1), 3)
if fb_total > 0 else None
}
summary_a = summarize(a)
summary_b = summarize(b)
# Determine winner
winners = {}
for metric in ["avg_cost", "avg_latency_ms", "error_rate", "guardrail_rate"]:
if summary_a[metric] < summary_b[metric]:
winners[metric] = template_a
elif summary_b[metric] < summary_a[metric]:
winners[metric] = template_b
for metric in ["avg_quality", "satisfaction"]:
if summary_a[metric] and summary_b[metric]:
if summary_a[metric] > summary_b[metric]:
winners[metric] = template_a
elif summary_b[metric] > summary_a[metric]:
winners[metric] = template_b
return {
template_a: summary_a,
template_b: summary_b,
"winners_by_metric": winners
}
Cost Monitoring and Budget Alerts
Cost Control Strategies:
- Set daily/hourly budget caps with automatic model fallback (e.g., GPT-4o to GPT-4o-mini when budget is 80% consumed)
- Track cost per user/feature to identify which parts of your app are most expensive
- Monitor token efficiency: output_tokens / input_tokens ratio reveals if prompts are too verbose or context is bloated
- Cache identical queries: semantic caching can reduce costs by 20-40% for many applications
- Alert on cost anomalies: a runaway loop or prompt injection can generate thousands of dollars in API costs in minutes
Real Cost Incident: A production app had a retry loop bug that resubmitted failed LLM calls up to 100 times per request. Combined with a provider outage that returned 500 errors, this generated $12,000 in API costs in 2 hours before the budget alert fired. Always implement exponential backoff AND hard per-request cost caps.
Lilly Tech Systems