Advanced
Step 6: Monitoring & Retraining
Build a production monitoring system that detects data drift, tracks model performance over time, and automatically triggers retraining when the fraud model degrades. Visualize everything in a Grafana dashboard.
Why Models Degrade
A fraud detection model that works perfectly today will degrade over time for several reasons:
- Concept drift: Fraud tactics evolve. New attack vectors emerge that the model has never seen.
- Data drift: The distribution of legitimate transactions changes due to seasonal patterns, new merchants, or economic conditions.
- Feedback loops: Blocking fraud changes fraudster behavior, which changes the data distribution the model sees.
- Feature drift: Upstream data pipelines change, causing features to shift in meaning or scale.
Silent failure: Unlike a crashed server, a degraded ML model fails silently. It still returns predictions, but the predictions are increasingly wrong. Without monitoring, you will not know until fraud losses spike or customer complaints surge.
Data Drift Detection with Evidently
# src/monitoring/drift.py
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
DataDriftTable,
DatasetDriftMetric,
ColumnDriftMetric
)
class DriftDetector:
"""Detects data drift between training data and production data.
Compares feature distributions using statistical tests:
- Kolmogorov-Smirnov test for numerical features
- Chi-squared test for categorical features
- Population Stability Index (PSI) for score distribution
"""
def __init__(self, reference_data: pd.DataFrame, feature_cols: list):
self.reference = reference_data[feature_cols].copy()
self.feature_cols = feature_cols
self.drift_history = []
def check_drift(
self,
current_data: pd.DataFrame,
drift_threshold: float = 0.1
) -> dict:
"""Run drift detection on a batch of current production data.
Args:
current_data: Recent production transactions with features
drift_threshold: Share of drifted features to flag overall drift
Returns:
Drift report with per-feature and dataset-level results
"""
current = current_data[self.feature_cols].copy()
# Build Evidently report
report = Report(metrics=[
DatasetDriftMetric(drift_share=drift_threshold),
DataDriftTable(),
])
report.run(
reference_data=self.reference,
current_data=current
)
# Extract results
result = report.as_dict()
metrics = result['metrics']
# Dataset-level drift
dataset_drift = metrics[0]['result']
is_drifted = dataset_drift['dataset_drift']
drift_share = dataset_drift['share_of_drifted_columns']
n_drifted = dataset_drift['number_of_drifted_columns']
# Per-feature drift
feature_drift = metrics[1]['result']
drifted_features = []
for col_name, col_result in feature_drift['drift_by_columns'].items():
if col_result['drift_detected']:
drifted_features.append({
'feature': col_name,
'drift_score': col_result['drift_score'],
'stattest': col_result['stattest_name'],
'threshold': col_result['stattest_threshold']
})
report_entry = {
'timestamp': datetime.utcnow().isoformat(),
'n_samples': len(current),
'dataset_drifted': is_drifted,
'drift_share': drift_share,
'n_drifted_features': n_drifted,
'drifted_features': drifted_features,
'action': 'RETRAIN' if is_drifted else 'OK'
}
self.drift_history.append(report_entry)
# Print summary
status = "DRIFT DETECTED" if is_drifted else "No drift"
print(f"\n=== Drift Report ({status}) ===")
print(f" Samples analyzed: {len(current):,}")
print(f" Drifted features: {n_drifted}/{len(self.feature_cols)} "
f"({drift_share*100:.1f}%)")
if drifted_features:
print(f"\n Top drifted features:")
for df in sorted(drifted_features,
key=lambda x: x['drift_score'])[:10]:
print(f" {df['feature']:<25s} "
f"score={df['drift_score']:.4f} "
f"({df['stattest']})")
return report_entry
def compute_psi(
self,
reference_scores: np.ndarray,
current_scores: np.ndarray,
n_bins: int = 20
) -> float:
"""Compute Population Stability Index for score distribution.
PSI < 0.1: No significant change
PSI 0.1-0.2: Moderate change, monitor closely
PSI > 0.2: Significant change, investigate / retrain
"""
# Create bins from reference distribution
bins = np.linspace(0, 1, n_bins + 1)
ref_counts = np.histogram(reference_scores, bins=bins)[0]
cur_counts = np.histogram(current_scores, bins=bins)[0]
# Normalize to proportions (avoid zeros)
ref_pct = (ref_counts + 1) / (ref_counts.sum() + n_bins)
cur_pct = (cur_counts + 1) / (cur_counts.sum() + n_bins)
# PSI formula
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
level = "OK" if psi < 0.1 else "MONITOR" if psi < 0.2 else "RETRAIN"
print(f" PSI: {psi:.4f} ({level})")
return psi
Performance Tracking
# src/monitoring/performance_tracker.py
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from sklearn.metrics import (
precision_score, recall_score, f1_score,
average_precision_score
)
from prometheus_client import Gauge
# Prometheus gauges for real-time monitoring
MODEL_RECALL = Gauge('fraud_model_recall', 'Model recall on labeled data')
MODEL_PRECISION = Gauge('fraud_model_precision', 'Model precision')
MODEL_F1 = Gauge('fraud_model_f1', 'Model F1 score')
MODEL_PR_AUC = Gauge('fraud_model_pr_auc', 'Model PR-AUC')
DRIFT_PSI = Gauge('fraud_drift_psi', 'Score distribution PSI')
ALERT_RATE = Gauge('fraud_alert_rate', 'Fraction of transactions flagged')
class PerformanceTracker:
"""Tracks model performance over time using labeled feedback.
Labels come from:
- Fraud analyst investigations (confirmed fraud / false alarm)
- Chargebacks (delayed fraud confirmation, usually 30-90 days)
- Customer reports of unauthorized transactions
"""
def __init__(self, threshold: float):
self.threshold = threshold
self.daily_metrics = []
self.predictions_buffer = []
self.labels_buffer = []
def log_prediction(self, fraud_score: float, predicted_fraud: bool):
"""Log a prediction for future evaluation when label arrives."""
self.predictions_buffer.append({
'timestamp': datetime.utcnow(),
'fraud_score': fraud_score,
'predicted_fraud': predicted_fraud
})
def log_label(self, transaction_id: str, actual_fraud: bool):
"""Log a ground truth label from investigation or chargeback."""
self.labels_buffer.append({
'transaction_id': transaction_id,
'actual_fraud': actual_fraud,
'labeled_at': datetime.utcnow()
})
def compute_daily_metrics(
self,
y_true: np.ndarray,
y_scores: np.ndarray,
date: str
) -> dict:
"""Compute metrics for a single day of labeled data."""
y_pred = (y_scores >= self.threshold).astype(int)
metrics = {
'date': date,
'n_labeled': len(y_true),
'n_fraud': int(y_true.sum()),
'n_flagged': int(y_pred.sum()),
'recall': recall_score(y_true, y_pred, zero_division=0),
'precision': precision_score(y_true, y_pred, zero_division=0),
'f1': f1_score(y_true, y_pred, zero_division=0),
'alert_rate': y_pred.mean(),
}
if y_true.sum() > 0:
metrics['pr_auc'] = average_precision_score(y_true, y_scores)
else:
metrics['pr_auc'] = None
# Update Prometheus gauges
MODEL_RECALL.set(metrics['recall'])
MODEL_PRECISION.set(metrics['precision'])
MODEL_F1.set(metrics['f1'])
if metrics['pr_auc'] is not None:
MODEL_PR_AUC.set(metrics['pr_auc'])
ALERT_RATE.set(metrics['alert_rate'])
self.daily_metrics.append(metrics)
return metrics
def check_degradation(
self,
window_days: int = 7,
recall_threshold: float = 0.90,
precision_threshold: float = 0.05
) -> dict:
"""Check if model performance has degraded below thresholds.
Returns retrain recommendation based on recent metrics.
"""
if len(self.daily_metrics) < window_days:
return {'action': 'INSUFFICIENT_DATA',
'reason': f'Need {window_days} days of metrics'}
recent = self.daily_metrics[-window_days:]
avg_recall = np.mean([m['recall'] for m in recent])
avg_precision = np.mean([m['precision'] for m in recent])
avg_f1 = np.mean([m['f1'] for m in recent])
issues = []
if avg_recall < recall_threshold:
issues.append(
f"Recall {avg_recall:.3f} < {recall_threshold} threshold"
)
if avg_precision < precision_threshold:
issues.append(
f"Precision {avg_precision:.3f} < {precision_threshold}"
)
# Check for downward trend
if len(self.daily_metrics) >= 14:
week1 = self.daily_metrics[-14:-7]
week2 = self.daily_metrics[-7:]
recall_delta = (
np.mean([m['recall'] for m in week2]) -
np.mean([m['recall'] for m in week1])
)
if recall_delta < -0.05:
issues.append(
f"Recall dropped {abs(recall_delta):.3f} week-over-week"
)
result = {
'window_days': window_days,
'avg_recall': avg_recall,
'avg_precision': avg_precision,
'avg_f1': avg_f1,
'issues': issues,
'action': 'RETRAIN' if issues else 'OK'
}
print(f"\n=== Performance Check (last {window_days} days) ===")
print(f" Avg Recall: {avg_recall:.4f}")
print(f" Avg Precision: {avg_precision:.4f}")
print(f" Avg F1: {avg_f1:.4f}")
if issues:
print(f" ACTION: RETRAIN RECOMMENDED")
for issue in issues:
print(f" - {issue}")
else:
print(f" Status: OK")
return result
Automated Retraining Pipeline
# src/monitoring/retrain.py
import os
import json
import joblib
import numpy as np
import pandas as pd
from datetime import datetime
from typing import Optional
from ..train import train_xgboost, find_optimal_threshold
from ..features import engineer_features, add_velocity_features
from .drift import DriftDetector
from .performance_tracker import PerformanceTracker
class RetrainingPipeline:
"""Automated retraining triggered by drift or performance degradation.
Workflow:
1. Collect new labeled data from production
2. Combine with historical training data
3. Re-engineer features
4. Train new model with same hyperparameters
5. Evaluate on holdout set
6. If new model is better, promote to production (shadow first)
7. Archive old model with metrics
"""
def __init__(
self,
model_dir: str = "models",
data_dir: str = "data"
):
self.model_dir = model_dir
self.data_dir = data_dir
self.retrain_history = []
def should_retrain(
self,
drift_report: dict,
performance_report: dict
) -> bool:
"""Decide whether to trigger retraining based on monitoring signals."""
triggers = []
# Drift-based trigger
if drift_report.get('dataset_drifted', False):
triggers.append('data_drift')
if drift_report.get('psi', 0) > 0.2:
triggers.append('score_drift')
# Performance-based trigger
if performance_report.get('action') == 'RETRAIN':
triggers.append('performance_degradation')
if triggers:
print(f"Retrain triggers: {', '.join(triggers)}")
return True
return False
def retrain(
self,
new_data: Optional[pd.DataFrame] = None,
reason: str = "scheduled"
) -> dict:
"""Execute the full retraining pipeline."""
start_time = datetime.utcnow()
print(f"\n{'='*60}")
print(f"RETRAINING PIPELINE - {reason}")
print(f"Started: {start_time.isoformat()}")
print(f"{'='*60}")
# Step 1: Load existing training data
artifacts = joblib.load(
os.path.join(self.model_dir, 'preprocessing_artifacts.pkl')
)
existing_data = artifacts['df_engineered']
print(f"Existing data: {len(existing_data):,} rows")
# Step 2: Combine with new labeled data
if new_data is not None:
combined = pd.concat([existing_data, new_data], ignore_index=True)
# Remove duplicates
combined = combined.drop_duplicates()
print(f"Combined data: {len(combined):,} rows "
f"(+{len(combined) - len(existing_data):,} new)")
else:
combined = existing_data
print("No new data provided, retraining on existing data")
# Step 3: Re-engineer features
df_features = engineer_features(combined)
df_features = add_velocity_features(df_features)
# Step 4: Prepare train/test split
from ..features import prepare_training_data
X_train, X_test, y_train, y_test, feature_cols = (
prepare_training_data(df_features)
)
# Step 5: Train new model
new_model = train_xgboost(X_train, y_train, X_test, y_test)
# Step 6: Evaluate
y_proba = new_model.predict_proba(X_test)[:, 1]
from sklearn.metrics import average_precision_score
new_pr_auc = average_precision_score(y_test, y_proba)
# Find optimal threshold
threshold_result = find_optimal_threshold(
y_test, y_proba, method='f2'
)
# Step 7: Compare with current model
current = joblib.load(
os.path.join(self.model_dir, 'fraud_model.pkl')
)
current_metrics = current.get('metrics', {})
current_pr_auc = current_metrics.get('pr_auc', 0)
improvement = new_pr_auc - current_pr_auc
print(f"\n=== Model Comparison ===")
print(f" Current PR-AUC: {current_pr_auc:.4f}")
print(f" New PR-AUC: {new_pr_auc:.4f}")
print(f" Improvement: {improvement:+.4f}")
# Step 8: Promote if better (or within tolerance for drift recovery)
promoted = False
if improvement > -0.01: # Allow slight regression if drift-triggered
# Archive current model
archive_path = os.path.join(
self.model_dir,
f"fraud_model_archive_{start_time.strftime('%Y%m%d_%H%M%S')}.pkl"
)
joblib.dump(current, archive_path)
print(f" Archived current model to {archive_path}")
# Save new model
new_artifacts = {
'model': new_model,
'threshold': threshold_result['threshold'],
'feature_cols': feature_cols,
'model_name': 'XGBoost',
'metrics': {
'pr_auc': new_pr_auc,
'recall': threshold_result['recall'],
'precision': threshold_result['precision'],
'threshold': threshold_result['threshold']
},
'trained_at': start_time.isoformat(),
'reason': reason,
'data_size': len(combined)
}
joblib.dump(
new_artifacts,
os.path.join(self.model_dir, 'fraud_model.pkl')
)
print(f" New model promoted to production!")
promoted = True
else:
print(f" New model NOT promoted (regression > 0.01)")
# Log retraining event
record = {
'timestamp': start_time.isoformat(),
'reason': reason,
'data_size': len(combined),
'new_pr_auc': new_pr_auc,
'old_pr_auc': current_pr_auc,
'improvement': improvement,
'promoted': promoted,
'new_threshold': threshold_result['threshold'],
'duration_seconds': (
datetime.utcnow() - start_time
).total_seconds()
}
self.retrain_history.append(record)
return record
Grafana Dashboard Configuration
Add Prometheus and Grafana to your Docker Compose for visualization:
# Add to docker-compose.yml
prometheus:
image: prom/prometheus:v2.48.0
ports:
- "9090:9090"
volumes:
- ./configs/prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:10.2.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
depends_on:
- prometheus
# configs/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'fraud-api'
static_configs:
- targets: ['host.docker.internal:8000']
metrics_path: '/metrics'
Key Dashboard Panels
| Panel | Metric | Alert Threshold |
|---|---|---|
| Prediction Latency | fraud_prediction_latency_ms | p99 > 50ms |
| Fraud Alert Rate | fraud_alert_rate | > 2% or < 0.05% |
| Model Recall | fraud_model_recall | < 0.90 |
| Model Precision | fraud_model_precision | < 0.05 |
| Score PSI | fraud_drift_psi | > 0.2 |
| Prediction Volume | fraud_predictions_total | Drop > 50% |
Scheduled Monitoring Job
# src/monitoring/scheduled_check.py
"""Run this as a daily cron job or Airflow DAG."""
import joblib
import pandas as pd
from .drift import DriftDetector
from .performance_tracker import PerformanceTracker
from .retrain import RetrainingPipeline
def daily_monitoring_check():
"""Daily monitoring: drift check + performance check + retrain decision."""
# Load reference data and current model
artifacts = joblib.load('models/preprocessing_artifacts.pkl')
model_artifacts = joblib.load('models/fraud_model.pkl')
feature_cols = model_artifacts['feature_cols']
# Load recent production data (from your data warehouse)
recent_data = pd.read_csv('data/recent_production_data.csv')
# 1. Check data drift
detector = DriftDetector(artifacts['df_engineered'], feature_cols)
drift_report = detector.check_drift(recent_data)
# 2. Check performance (if labeled data available)
tracker = PerformanceTracker(model_artifacts['threshold'])
perf_report = tracker.check_degradation()
# 3. Decide on retraining
pipeline = RetrainingPipeline()
if pipeline.should_retrain(drift_report, perf_report):
result = pipeline.retrain(
new_data=recent_data,
reason="automated_monitoring"
)
print(f"\nRetraining complete: {result['promoted']}")
else:
print("\nNo retraining needed.")
if __name__ == "__main__":
daily_monitoring_check()
Retraining frequency: For fraud detection, weekly retraining is a good starting point. During high-activity periods (Black Friday, holiday season), switch to daily retraining. The monitoring pipeline will tell you when the model actually needs it rather than retraining on a fixed schedule.
What Is Next
We now have a complete, monitored fraud detection system. In the final lesson, we will explore enhancements: human-in-the-loop review for borderline cases, feedback integration for continuous learning, graph-based fraud detection, and answers to frequently asked questions.
Lilly Tech Systems