Alerting & Incident Response Advanced

Good monitoring is useless without good alerting. ML teams are particularly prone to alert fatigue because ML systems generate more nuanced signals than traditional software. This lesson covers how to design alerts that actually get acted on, build runbooks for ML-specific incidents, and integrate with incident management tools.

Alert Severity Design for ML Systems

SeverityResponse TimeNotificationML Examples
P0 - Critical< 15 minutesPagerDuty page + phone callModel serving is down, 100% error rate, data pipeline completely stopped
P1 - High< 1 hourPagerDuty page + SlackPrediction collapse (single class), p99 latency > 10s, cost runaway
P2 - Medium< 4 hoursSlack channelSignificant data drift (>25% features), accuracy drop > 5%, ground truth pipeline delayed
P3 - LowNext business dayEmail / dashboardModerate drift, minor latency increase, feature importance shift

Production Alert Manager

# Production ML alert manager with routing and deduplication
import hashlib
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable
from enum import Enum
from dataclasses import dataclass, field

class Severity(Enum):
    CRITICAL = "P0"
    HIGH = "P1"
    MEDIUM = "P2"
    LOW = "P3"

@dataclass
class Alert:
    alert_id: str
    title: str
    description: str
    severity: Severity
    source: str              # e.g., "drift_detector", "perf_monitor"
    model_name: str
    timestamp: str
    metrics: Dict = field(default_factory=dict)
    runbook_url: str = ""
    dedup_key: str = ""
    acknowledged: bool = False
    resolved: bool = False

@dataclass
class AlertRule:
    name: str
    condition: Callable      # Function that returns True if alert should fire
    severity: Severity
    title_template: str
    description_template: str
    cooldown_minutes: int = 30    # Don't re-fire within this window
    runbook_url: str = ""
    notify_channels: List[str] = field(default_factory=lambda: ["slack"])

class MLAlertManager:
    """Production alert manager for ML systems.

    Features:
    - Rule-based alert generation
    - Deduplication (don't spam the same alert)
    - Severity-based routing
    - Cooldown periods
    - Alert grouping
    """

    def __init__(self, service_name: str):
        self.service_name = service_name
        self.rules: Dict[str, AlertRule] = {}
        self.active_alerts: Dict[str, Alert] = {}
        self.alert_history: List[Alert] = []
        self.last_fired: Dict[str, datetime] = {}
        self.notification_handlers: Dict[str, Callable] = {}

    def register_rule(self, rule: AlertRule):
        """Register an alert rule."""
        self.rules[rule.name] = rule

    def register_notification_handler(self, channel: str,
                                       handler: Callable):
        """Register a notification handler (Slack, PagerDuty, etc.)."""
        self.notification_handlers[channel] = handler

    def evaluate_rules(self, metrics: dict, model_name: str):
        """Evaluate all rules against current metrics.
        Call this periodically (e.g., every minute)."""
        now = datetime.utcnow()

        for rule_name, rule in self.rules.items():
            try:
                should_fire = rule.condition(metrics)
            except Exception:
                continue

            if not should_fire:
                # Check if we should auto-resolve
                dedup_key = f"{rule_name}:{model_name}"
                if dedup_key in self.active_alerts:
                    self._resolve_alert(dedup_key)
                continue

            # Check cooldown
            dedup_key = f"{rule_name}:{model_name}"
            last = self.last_fired.get(dedup_key)
            if last and (now - last).seconds < rule.cooldown_minutes * 60:
                continue  # Still in cooldown

            # Fire alert
            alert = Alert(
                alert_id=hashlib.md5(
                    f"{dedup_key}:{now.isoformat()}".encode()
                ).hexdigest()[:12],
                title=rule.title_template.format(**metrics,
                                                  model=model_name),
                description=rule.description_template.format(
                    **metrics, model=model_name),
                severity=rule.severity,
                source=rule_name,
                model_name=model_name,
                timestamp=now.isoformat(),
                metrics=metrics,
                runbook_url=rule.runbook_url,
                dedup_key=dedup_key
            )

            self.active_alerts[dedup_key] = alert
            self.alert_history.append(alert)
            self.last_fired[dedup_key] = now
            self._route_alert(alert, rule)

    def _route_alert(self, alert: Alert, rule: AlertRule):
        """Route alert to appropriate channels based on severity."""
        channels = list(rule.notify_channels)

        # P0/P1 always go to PagerDuty
        if alert.severity in [Severity.CRITICAL, Severity.HIGH]:
            if "pagerduty" not in channels:
                channels.append("pagerduty")

        for channel in channels:
            handler = self.notification_handlers.get(channel)
            if handler:
                try:
                    handler(alert)
                except Exception as e:
                    print(f"Failed to send alert to {channel}: {e}")

    def _resolve_alert(self, dedup_key: str):
        """Auto-resolve an alert when condition clears."""
        if dedup_key in self.active_alerts:
            alert = self.active_alerts[dedup_key]
            alert.resolved = True
            del self.active_alerts[dedup_key]

    def get_active_alerts(self) -> List[Alert]:
        """Get all currently active (unresolved) alerts."""
        return sorted(
            self.active_alerts.values(),
            key=lambda a: list(Severity).index(a.severity)
        )


# --- Setup Example ---

# Notification handlers
def slack_handler(alert: Alert):
    """Send alert to Slack via webhook."""
    severity_emoji = {
        Severity.CRITICAL: "🔴",
        Severity.HIGH: "🟠",
        Severity.MEDIUM: "🟡",
        Severity.LOW: "🔵"
    }
    message = {
        "text": f"{severity_emoji[alert.severity]} [{alert.severity.value}] "
                f"{alert.title}\n{alert.description}"
                f"\nRunbook: {alert.runbook_url}"
    }
    # requests.post(SLACK_WEBHOOK_URL, json=message)
    print(f"Slack: {json.dumps(message)}")

def pagerduty_handler(alert: Alert):
    """Create PagerDuty incident."""
    event = {
        "routing_key": "YOUR_PAGERDUTY_INTEGRATION_KEY",
        "event_action": "trigger",
        "dedup_key": alert.dedup_key,
        "payload": {
            "summary": f"[{alert.severity.value}] {alert.title}",
            "source": alert.source,
            "severity": "critical" if alert.severity == Severity.CRITICAL
                       else "error" if alert.severity == Severity.HIGH
                       else "warning",
            "custom_details": alert.metrics
        }
    }
    # requests.post("https://events.pagerduty.com/v2/enqueue", json=event)
    print(f"PagerDuty: {event['payload']['summary']}")

# Wire it all up
manager = MLAlertManager("ml-platform")
manager.register_notification_handler("slack", slack_handler)
manager.register_notification_handler("pagerduty", pagerduty_handler)

Alert Rules for Common ML Scenarios

# Common ML alert rules - copy and adapt for your system

# Rule 1: Prediction volume drop
manager.register_rule(AlertRule(
    name="prediction_volume_drop",
    condition=lambda m: m.get("predictions_last_hour", 0) <
                        m.get("expected_predictions_per_hour", 100) * 0.5,
    severity=Severity.HIGH,
    title_template="Prediction volume dropped 50%+ for {model}",
    description_template=(
        "Expected ~{expected_predictions_per_hour}/hr, "
        "got {predictions_last_hour}. "
        "Check if upstream data pipeline is running."
    ),
    cooldown_minutes=30,
    runbook_url="https://wiki.internal/runbooks/low-prediction-volume",
    notify_channels=["slack", "pagerduty"]
))

# Rule 2: Data drift detected
manager.register_rule(AlertRule(
    name="significant_data_drift",
    condition=lambda m: m.get("drifted_feature_count", 0) >=
                        m.get("total_features", 1) * 0.25,
    severity=Severity.MEDIUM,
    title_template="Significant data drift: {drifted_feature_count} features for {model}",
    description_template=(
        "{drifted_feature_count}/{total_features} features show "
        "significant drift (PSI > 0.2). Top drifted: {top_drifted_features}. "
        "Review drift dashboard and upstream data sources."
    ),
    cooldown_minutes=60,
    runbook_url="https://wiki.internal/runbooks/data-drift",
    notify_channels=["slack"]
))

# Rule 3: Model accuracy degradation
manager.register_rule(AlertRule(
    name="accuracy_degradation",
    condition=lambda m: (m.get("current_accuracy", 1.0) <
                         m.get("baseline_accuracy", 1.0) * 0.95),
    severity=Severity.HIGH,
    title_template="Model accuracy dropped >5% for {model}",
    description_template=(
        "Current accuracy: {current_accuracy:.3f}, "
        "baseline: {baseline_accuracy:.3f}. "
        "Check data drift, feature pipeline, and recent deployments."
    ),
    cooldown_minutes=60,
    runbook_url="https://wiki.internal/runbooks/accuracy-drop",
    notify_channels=["slack", "pagerduty"]
))

# Rule 4: Latency spike
manager.register_rule(AlertRule(
    name="latency_spike",
    condition=lambda m: m.get("p99_latency_ms", 0) > 5000,
    severity=Severity.HIGH,
    title_template="P99 latency >5s for {model}",
    description_template=(
        "P99 latency: {p99_latency_ms}ms (threshold: 5000ms). "
        "Check model serving infrastructure, GPU memory, batch sizes."
    ),
    cooldown_minutes=15,
    runbook_url="https://wiki.internal/runbooks/latency-spike",
    notify_channels=["slack", "pagerduty"]
))

# Rule 5: LLM cost runaway
manager.register_rule(AlertRule(
    name="llm_cost_runaway",
    condition=lambda m: m.get("daily_cost_usd", 0) >
                        m.get("daily_budget_usd", 500) * 0.8,
    severity=Severity.HIGH,
    title_template="LLM cost approaching budget for {model}",
    description_template=(
        "Daily cost: ${daily_cost_usd:.2f} / "
        "${daily_budget_usd:.2f} budget (80%+ consumed). "
        "Check for retry loops, prompt bloat, or traffic spikes."
    ),
    cooldown_minutes=60,
    runbook_url="https://wiki.internal/runbooks/llm-cost",
    notify_channels=["slack", "pagerduty"]
))

ML Incident Runbook Template

# ML Incident Runbook Template - copy for each model
RUNBOOK_TEMPLATE = """
# ML Incident Runbook: {model_name}
## Owner: {team} | Escalation: {escalation_contact}

## Quick Reference
- Model endpoint: {endpoint}
- Dashboard: {dashboard_url}
- Feature store: {feature_store_url}
- Training pipeline: {training_pipeline_url}

## Triage Steps (do these FIRST, in order)

### 1. Is the model serving traffic? (30 seconds)
   - Check: `curl -s {endpoint}/health | jq .status`
   - If DOWN: escalate to infra team immediately (P0)
   - If UP: continue to step 2

### 2. Is prediction volume normal? (1 minute)
   - Check dashboard: {dashboard_url}
   - Compare current QPS to same time yesterday
   - If volume dropped >50%: check upstream data pipeline
   - If volume is normal: continue to step 3

### 3. Are predictions reasonable? (2 minutes)
   - Check prediction distribution on dashboard
   - If one class >95%: PREDICTION COLLAPSE
     -> Rollback to previous model version: {rollback_command}
     -> Investigate feature pipeline
   - If distribution looks normal: continue to step 4

### 4. Check data quality (5 minutes)
   - Review drift dashboard for top drifted features
   - Check null rates: any feature >5% nulls?
   - Check feature freshness: any features stale >30 min?
   - If data issues found: fix upstream, then evaluate model

### 5. Check model performance (5 minutes)
   - Review accuracy/precision/recall on dashboard
   - Compare to baseline (accuracy: {baseline_accuracy})
   - If degraded >5%: evaluate if retrain is needed

## Rollback Procedure
```bash
# Quick rollback to previous model version
kubectl set image deployment/{deployment_name} \\
  model={previous_model_image}

# Verify rollback
kubectl rollout status deployment/{deployment_name}
curl -s {endpoint}/health | jq .model_version
```

## Escalation
- L1 (ML Engineer on-call): {l1_contact}
- L2 (ML Platform team): {l2_contact}
- L3 (VP Engineering): {l3_contact} (P0 only, after 30 min)
"""

print(RUNBOOK_TEMPLATE.format(
    model_name="fraud-detector-v2",
    team="ml-platform",
    escalation_contact="#ml-incidents Slack channel",
    endpoint="https://ml.internal/fraud/v2",
    dashboard_url="https://grafana.internal/d/fraud-v2",
    feature_store_url="https://feast.internal/fraud-features",
    training_pipeline_url="https://airflow.internal/fraud-train",
    rollback_command="kubectl rollout undo deployment/fraud-v2",
    baseline_accuracy=0.95,
    deployment_name="fraud-v2",
    previous_model_image="ml-registry/fraud:v2.2",
    l1_contact="@ml-oncall",
    l2_contact="@ml-platform-team",
    l3_contact="@vp-eng"
))

Reducing Alert Fatigue

The Alert Fatigue Problem: When engineers get too many alerts, they start ignoring all of them — including the critical ones. ML systems are especially prone to alert fatigue because drift and performance metrics generate constant noise.
StrategyHow It HelpsImplementation
DeduplicationOne alert per issue, not one per minuteUse dedup_key, alert only on state change (fire/resolve)
Cooldown periodsPrevent re-alerting during investigation30-60 min cooldown per alert type
Alert groupingRelated alerts become one notificationGroup by model or data pipeline
Progressive severityStart low, escalate if unresolvedP3 at 15 min, P2 at 30 min, P1 at 60 min
Weekly reviewIdentify and fix noisy alertsTrack alert-to-action ratio; delete alerts that are never acted on
Business hours routingOnly page for things that truly can't waitP2/P3 go to Slack only; P0/P1 page