Advanced

Step 6: Monitoring & Retraining

Build a production monitoring system that detects data drift, tracks model performance over time, and automatically triggers retraining when the fraud model degrades. Visualize everything in a Grafana dashboard.

Why Models Degrade

A fraud detection model that works perfectly today will degrade over time for several reasons:

  • Concept drift: Fraud tactics evolve. New attack vectors emerge that the model has never seen.
  • Data drift: The distribution of legitimate transactions changes due to seasonal patterns, new merchants, or economic conditions.
  • Feedback loops: Blocking fraud changes fraudster behavior, which changes the data distribution the model sees.
  • Feature drift: Upstream data pipelines change, causing features to shift in meaning or scale.
Silent failure: Unlike a crashed server, a degraded ML model fails silently. It still returns predictions, but the predictions are increasingly wrong. Without monitoring, you will not know until fraud losses spike or customer complaints surge.

Data Drift Detection with Evidently

# src/monitoring/drift.py
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
    DataDriftTable,
    DatasetDriftMetric,
    ColumnDriftMetric
)


class DriftDetector:
    """Detects data drift between training data and production data.

    Compares feature distributions using statistical tests:
    - Kolmogorov-Smirnov test for numerical features
    - Chi-squared test for categorical features
    - Population Stability Index (PSI) for score distribution
    """

    def __init__(self, reference_data: pd.DataFrame, feature_cols: list):
        self.reference = reference_data[feature_cols].copy()
        self.feature_cols = feature_cols
        self.drift_history = []

    def check_drift(
        self,
        current_data: pd.DataFrame,
        drift_threshold: float = 0.1
    ) -> dict:
        """Run drift detection on a batch of current production data.

        Args:
            current_data: Recent production transactions with features
            drift_threshold: Share of drifted features to flag overall drift

        Returns:
            Drift report with per-feature and dataset-level results
        """
        current = current_data[self.feature_cols].copy()

        # Build Evidently report
        report = Report(metrics=[
            DatasetDriftMetric(drift_share=drift_threshold),
            DataDriftTable(),
        ])

        report.run(
            reference_data=self.reference,
            current_data=current
        )

        # Extract results
        result = report.as_dict()
        metrics = result['metrics']

        # Dataset-level drift
        dataset_drift = metrics[0]['result']
        is_drifted = dataset_drift['dataset_drift']
        drift_share = dataset_drift['share_of_drifted_columns']
        n_drifted = dataset_drift['number_of_drifted_columns']

        # Per-feature drift
        feature_drift = metrics[1]['result']
        drifted_features = []
        for col_name, col_result in feature_drift['drift_by_columns'].items():
            if col_result['drift_detected']:
                drifted_features.append({
                    'feature': col_name,
                    'drift_score': col_result['drift_score'],
                    'stattest': col_result['stattest_name'],
                    'threshold': col_result['stattest_threshold']
                })

        report_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'n_samples': len(current),
            'dataset_drifted': is_drifted,
            'drift_share': drift_share,
            'n_drifted_features': n_drifted,
            'drifted_features': drifted_features,
            'action': 'RETRAIN' if is_drifted else 'OK'
        }

        self.drift_history.append(report_entry)

        # Print summary
        status = "DRIFT DETECTED" if is_drifted else "No drift"
        print(f"\n=== Drift Report ({status}) ===")
        print(f"  Samples analyzed:  {len(current):,}")
        print(f"  Drifted features:  {n_drifted}/{len(self.feature_cols)} "
              f"({drift_share*100:.1f}%)")

        if drifted_features:
            print(f"\n  Top drifted features:")
            for df in sorted(drifted_features,
                           key=lambda x: x['drift_score'])[:10]:
                print(f"    {df['feature']:<25s} "
                      f"score={df['drift_score']:.4f} "
                      f"({df['stattest']})")

        return report_entry

    def compute_psi(
        self,
        reference_scores: np.ndarray,
        current_scores: np.ndarray,
        n_bins: int = 20
    ) -> float:
        """Compute Population Stability Index for score distribution.

        PSI < 0.1:  No significant change
        PSI 0.1-0.2: Moderate change, monitor closely
        PSI > 0.2:  Significant change, investigate / retrain
        """
        # Create bins from reference distribution
        bins = np.linspace(0, 1, n_bins + 1)
        ref_counts = np.histogram(reference_scores, bins=bins)[0]
        cur_counts = np.histogram(current_scores, bins=bins)[0]

        # Normalize to proportions (avoid zeros)
        ref_pct = (ref_counts + 1) / (ref_counts.sum() + n_bins)
        cur_pct = (cur_counts + 1) / (cur_counts.sum() + n_bins)

        # PSI formula
        psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))

        level = "OK" if psi < 0.1 else "MONITOR" if psi < 0.2 else "RETRAIN"
        print(f"  PSI: {psi:.4f} ({level})")

        return psi

Performance Tracking

# src/monitoring/performance_tracker.py
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from sklearn.metrics import (
    precision_score, recall_score, f1_score,
    average_precision_score
)
from prometheus_client import Gauge


# Prometheus gauges for real-time monitoring
MODEL_RECALL = Gauge('fraud_model_recall', 'Model recall on labeled data')
MODEL_PRECISION = Gauge('fraud_model_precision', 'Model precision')
MODEL_F1 = Gauge('fraud_model_f1', 'Model F1 score')
MODEL_PR_AUC = Gauge('fraud_model_pr_auc', 'Model PR-AUC')
DRIFT_PSI = Gauge('fraud_drift_psi', 'Score distribution PSI')
ALERT_RATE = Gauge('fraud_alert_rate', 'Fraction of transactions flagged')


class PerformanceTracker:
    """Tracks model performance over time using labeled feedback.

    Labels come from:
    - Fraud analyst investigations (confirmed fraud / false alarm)
    - Chargebacks (delayed fraud confirmation, usually 30-90 days)
    - Customer reports of unauthorized transactions
    """

    def __init__(self, threshold: float):
        self.threshold = threshold
        self.daily_metrics = []
        self.predictions_buffer = []
        self.labels_buffer = []

    def log_prediction(self, fraud_score: float, predicted_fraud: bool):
        """Log a prediction for future evaluation when label arrives."""
        self.predictions_buffer.append({
            'timestamp': datetime.utcnow(),
            'fraud_score': fraud_score,
            'predicted_fraud': predicted_fraud
        })

    def log_label(self, transaction_id: str, actual_fraud: bool):
        """Log a ground truth label from investigation or chargeback."""
        self.labels_buffer.append({
            'transaction_id': transaction_id,
            'actual_fraud': actual_fraud,
            'labeled_at': datetime.utcnow()
        })

    def compute_daily_metrics(
        self,
        y_true: np.ndarray,
        y_scores: np.ndarray,
        date: str
    ) -> dict:
        """Compute metrics for a single day of labeled data."""
        y_pred = (y_scores >= self.threshold).astype(int)

        metrics = {
            'date': date,
            'n_labeled': len(y_true),
            'n_fraud': int(y_true.sum()),
            'n_flagged': int(y_pred.sum()),
            'recall': recall_score(y_true, y_pred, zero_division=0),
            'precision': precision_score(y_true, y_pred, zero_division=0),
            'f1': f1_score(y_true, y_pred, zero_division=0),
            'alert_rate': y_pred.mean(),
        }

        if y_true.sum() > 0:
            metrics['pr_auc'] = average_precision_score(y_true, y_scores)
        else:
            metrics['pr_auc'] = None

        # Update Prometheus gauges
        MODEL_RECALL.set(metrics['recall'])
        MODEL_PRECISION.set(metrics['precision'])
        MODEL_F1.set(metrics['f1'])
        if metrics['pr_auc'] is not None:
            MODEL_PR_AUC.set(metrics['pr_auc'])
        ALERT_RATE.set(metrics['alert_rate'])

        self.daily_metrics.append(metrics)

        return metrics

    def check_degradation(
        self,
        window_days: int = 7,
        recall_threshold: float = 0.90,
        precision_threshold: float = 0.05
    ) -> dict:
        """Check if model performance has degraded below thresholds.

        Returns retrain recommendation based on recent metrics.
        """
        if len(self.daily_metrics) < window_days:
            return {'action': 'INSUFFICIENT_DATA',
                    'reason': f'Need {window_days} days of metrics'}

        recent = self.daily_metrics[-window_days:]

        avg_recall = np.mean([m['recall'] for m in recent])
        avg_precision = np.mean([m['precision'] for m in recent])
        avg_f1 = np.mean([m['f1'] for m in recent])

        issues = []
        if avg_recall < recall_threshold:
            issues.append(
                f"Recall {avg_recall:.3f} < {recall_threshold} threshold"
            )
        if avg_precision < precision_threshold:
            issues.append(
                f"Precision {avg_precision:.3f} < {precision_threshold}"
            )

        # Check for downward trend
        if len(self.daily_metrics) >= 14:
            week1 = self.daily_metrics[-14:-7]
            week2 = self.daily_metrics[-7:]
            recall_delta = (
                np.mean([m['recall'] for m in week2]) -
                np.mean([m['recall'] for m in week1])
            )
            if recall_delta < -0.05:
                issues.append(
                    f"Recall dropped {abs(recall_delta):.3f} week-over-week"
                )

        result = {
            'window_days': window_days,
            'avg_recall': avg_recall,
            'avg_precision': avg_precision,
            'avg_f1': avg_f1,
            'issues': issues,
            'action': 'RETRAIN' if issues else 'OK'
        }

        print(f"\n=== Performance Check (last {window_days} days) ===")
        print(f"  Avg Recall:    {avg_recall:.4f}")
        print(f"  Avg Precision: {avg_precision:.4f}")
        print(f"  Avg F1:        {avg_f1:.4f}")
        if issues:
            print(f"  ACTION: RETRAIN RECOMMENDED")
            for issue in issues:
                print(f"    - {issue}")
        else:
            print(f"  Status: OK")

        return result

Automated Retraining Pipeline

# src/monitoring/retrain.py
import os
import json
import joblib
import numpy as np
import pandas as pd
from datetime import datetime
from typing import Optional

from ..train import train_xgboost, find_optimal_threshold
from ..features import engineer_features, add_velocity_features
from .drift import DriftDetector
from .performance_tracker import PerformanceTracker


class RetrainingPipeline:
    """Automated retraining triggered by drift or performance degradation.

    Workflow:
    1. Collect new labeled data from production
    2. Combine with historical training data
    3. Re-engineer features
    4. Train new model with same hyperparameters
    5. Evaluate on holdout set
    6. If new model is better, promote to production (shadow first)
    7. Archive old model with metrics
    """

    def __init__(
        self,
        model_dir: str = "models",
        data_dir: str = "data"
    ):
        self.model_dir = model_dir
        self.data_dir = data_dir
        self.retrain_history = []

    def should_retrain(
        self,
        drift_report: dict,
        performance_report: dict
    ) -> bool:
        """Decide whether to trigger retraining based on monitoring signals."""
        triggers = []

        # Drift-based trigger
        if drift_report.get('dataset_drifted', False):
            triggers.append('data_drift')

        if drift_report.get('psi', 0) > 0.2:
            triggers.append('score_drift')

        # Performance-based trigger
        if performance_report.get('action') == 'RETRAIN':
            triggers.append('performance_degradation')

        if triggers:
            print(f"Retrain triggers: {', '.join(triggers)}")
            return True

        return False

    def retrain(
        self,
        new_data: Optional[pd.DataFrame] = None,
        reason: str = "scheduled"
    ) -> dict:
        """Execute the full retraining pipeline."""
        start_time = datetime.utcnow()
        print(f"\n{'='*60}")
        print(f"RETRAINING PIPELINE - {reason}")
        print(f"Started: {start_time.isoformat()}")
        print(f"{'='*60}")

        # Step 1: Load existing training data
        artifacts = joblib.load(
            os.path.join(self.model_dir, 'preprocessing_artifacts.pkl')
        )
        existing_data = artifacts['df_engineered']
        print(f"Existing data: {len(existing_data):,} rows")

        # Step 2: Combine with new labeled data
        if new_data is not None:
            combined = pd.concat([existing_data, new_data], ignore_index=True)
            # Remove duplicates
            combined = combined.drop_duplicates()
            print(f"Combined data: {len(combined):,} rows "
                  f"(+{len(combined) - len(existing_data):,} new)")
        else:
            combined = existing_data
            print("No new data provided, retraining on existing data")

        # Step 3: Re-engineer features
        df_features = engineer_features(combined)
        df_features = add_velocity_features(df_features)

        # Step 4: Prepare train/test split
        from ..features import prepare_training_data
        X_train, X_test, y_train, y_test, feature_cols = (
            prepare_training_data(df_features)
        )

        # Step 5: Train new model
        new_model = train_xgboost(X_train, y_train, X_test, y_test)

        # Step 6: Evaluate
        y_proba = new_model.predict_proba(X_test)[:, 1]
        from sklearn.metrics import average_precision_score
        new_pr_auc = average_precision_score(y_test, y_proba)

        # Find optimal threshold
        threshold_result = find_optimal_threshold(
            y_test, y_proba, method='f2'
        )

        # Step 7: Compare with current model
        current = joblib.load(
            os.path.join(self.model_dir, 'fraud_model.pkl')
        )
        current_metrics = current.get('metrics', {})
        current_pr_auc = current_metrics.get('pr_auc', 0)

        improvement = new_pr_auc - current_pr_auc

        print(f"\n=== Model Comparison ===")
        print(f"  Current PR-AUC: {current_pr_auc:.4f}")
        print(f"  New PR-AUC:     {new_pr_auc:.4f}")
        print(f"  Improvement:    {improvement:+.4f}")

        # Step 8: Promote if better (or within tolerance for drift recovery)
        promoted = False
        if improvement > -0.01:  # Allow slight regression if drift-triggered
            # Archive current model
            archive_path = os.path.join(
                self.model_dir,
                f"fraud_model_archive_{start_time.strftime('%Y%m%d_%H%M%S')}.pkl"
            )
            joblib.dump(current, archive_path)
            print(f"  Archived current model to {archive_path}")

            # Save new model
            new_artifacts = {
                'model': new_model,
                'threshold': threshold_result['threshold'],
                'feature_cols': feature_cols,
                'model_name': 'XGBoost',
                'metrics': {
                    'pr_auc': new_pr_auc,
                    'recall': threshold_result['recall'],
                    'precision': threshold_result['precision'],
                    'threshold': threshold_result['threshold']
                },
                'trained_at': start_time.isoformat(),
                'reason': reason,
                'data_size': len(combined)
            }
            joblib.dump(
                new_artifacts,
                os.path.join(self.model_dir, 'fraud_model.pkl')
            )
            print(f"  New model promoted to production!")
            promoted = True
        else:
            print(f"  New model NOT promoted (regression > 0.01)")

        # Log retraining event
        record = {
            'timestamp': start_time.isoformat(),
            'reason': reason,
            'data_size': len(combined),
            'new_pr_auc': new_pr_auc,
            'old_pr_auc': current_pr_auc,
            'improvement': improvement,
            'promoted': promoted,
            'new_threshold': threshold_result['threshold'],
            'duration_seconds': (
                datetime.utcnow() - start_time
            ).total_seconds()
        }
        self.retrain_history.append(record)

        return record

Grafana Dashboard Configuration

Add Prometheus and Grafana to your Docker Compose for visualization:

# Add to docker-compose.yml
  prometheus:
    image: prom/prometheus:v2.48.0
    ports:
      - "9090:9090"
    volumes:
      - ./configs/prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana:10.2.0
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    depends_on:
      - prometheus
# configs/prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'fraud-api'
    static_configs:
      - targets: ['host.docker.internal:8000']
    metrics_path: '/metrics'

Key Dashboard Panels

PanelMetricAlert Threshold
Prediction Latencyfraud_prediction_latency_msp99 > 50ms
Fraud Alert Ratefraud_alert_rate> 2% or < 0.05%
Model Recallfraud_model_recall< 0.90
Model Precisionfraud_model_precision< 0.05
Score PSIfraud_drift_psi> 0.2
Prediction Volumefraud_predictions_totalDrop > 50%

Scheduled Monitoring Job

# src/monitoring/scheduled_check.py
"""Run this as a daily cron job or Airflow DAG."""
import joblib
import pandas as pd
from .drift import DriftDetector
from .performance_tracker import PerformanceTracker
from .retrain import RetrainingPipeline


def daily_monitoring_check():
    """Daily monitoring: drift check + performance check + retrain decision."""

    # Load reference data and current model
    artifacts = joblib.load('models/preprocessing_artifacts.pkl')
    model_artifacts = joblib.load('models/fraud_model.pkl')
    feature_cols = model_artifacts['feature_cols']

    # Load recent production data (from your data warehouse)
    recent_data = pd.read_csv('data/recent_production_data.csv')

    # 1. Check data drift
    detector = DriftDetector(artifacts['df_engineered'], feature_cols)
    drift_report = detector.check_drift(recent_data)

    # 2. Check performance (if labeled data available)
    tracker = PerformanceTracker(model_artifacts['threshold'])
    perf_report = tracker.check_degradation()

    # 3. Decide on retraining
    pipeline = RetrainingPipeline()
    if pipeline.should_retrain(drift_report, perf_report):
        result = pipeline.retrain(
            new_data=recent_data,
            reason="automated_monitoring"
        )
        print(f"\nRetraining complete: {result['promoted']}")
    else:
        print("\nNo retraining needed.")


if __name__ == "__main__":
    daily_monitoring_check()
💡
Retraining frequency: For fraud detection, weekly retraining is a good starting point. During high-activity periods (Black Friday, holiday season), switch to daily retraining. The monitoring pipeline will tell you when the model actually needs it rather than retraining on a fixed schedule.

What Is Next

We now have a complete, monitored fraud detection system. In the final lesson, we will explore enhancements: human-in-the-loop review for borderline cases, feedback integration for continuous learning, graph-based fraud detection, and answers to frequently asked questions.