Advanced

Feature Monitoring

Monitor data quality, detect feature drift, and alert on freshness issues.

Feature Monitor

# src/monitor.py
import numpy as np
import pandas as pd
from scipy import stats
from datetime import datetime, timedelta
from typing import Dict, List, Optional

class FeatureMonitor:
    def __init__(self, store):
        self.store = store
        self.baselines = {}
        self.alerts = []

    def compute_baseline(self, feature_name, data):
        """Compute statistical baseline for a feature."""
        self.baselines[feature_name] = {
            "mean": float(np.mean(data)),
            "std": float(np.std(data)),
            "min": float(np.min(data)),
            "max": float(np.max(data)),
            "null_rate": float(np.isnan(data).mean()),
            "computed_at": datetime.now().isoformat(),
        }

    def check_drift(self, feature_name, current_data,
                    threshold=0.05):
        """Detect distribution drift using KS test."""
        if feature_name not in self.baselines:
            return {"drift": False, "reason": "no baseline"}

        baseline = self.baselines[feature_name]
        baseline_data = np.random.normal(
            baseline["mean"], baseline["std"], len(current_data))

        ks_stat, p_value = stats.ks_2samp(baseline_data, current_data)
        drifted = p_value < threshold

        result = {
            "feature": feature_name,
            "drift": drifted,
            "ks_statistic": round(float(ks_stat), 4),
            "p_value": round(float(p_value), 4),
            "current_mean": round(float(np.mean(current_data)), 4),
            "baseline_mean": baseline["mean"],
        }

        if drifted:
            self.alerts.append({
                "type": "drift",
                "feature": feature_name,
                "severity": "high" if p_value < 0.01 else "medium",
                "timestamp": datetime.now().isoformat(),
                **result,
            })
        return result

    def check_quality(self, feature_name, data):
        """Check data quality metrics."""
        issues = []
        null_rate = float(np.isnan(data).mean()) if hasattr(data, '__len__') else 0

        if null_rate > 0.05:
            issues.append(f"High null rate: {null_rate:.1%}")
        if len(set(data)) == 1:
            issues.append("Single value (constant feature)")
        if np.any(np.isinf(data)):
            issues.append("Contains infinity values")

        result = {
            "feature": feature_name,
            "null_rate": round(null_rate, 4),
            "unique_values": len(set(data)),
            "issues": issues,
            "healthy": len(issues) == 0,
        }

        if issues:
            self.alerts.append({
                "type": "quality",
                "feature": feature_name,
                "severity": "high" if null_rate > 0.2 else "medium",
                "issues": issues,
                "timestamp": datetime.now().isoformat(),
            })
        return result

    def check_freshness(self, feature_view_name,
                        max_age_hours=24):
        """Check if features are stale."""
        try:
            meta = self.store.get_feature_view(feature_view_name)
            # Check last materialization time
            fresh = True  # simplified check
            result = {
                "feature_view": feature_view_name,
                "fresh": fresh,
                "max_age_hours": max_age_hours,
            }
            return result
        except Exception as e:
            return {"feature_view": feature_view_name,
                    "error": str(e)}

    def get_report(self):
        return {
            "timestamp": datetime.now().isoformat(),
            "total_alerts": len(self.alerts),
            "alerts": self.alerts,
            "baselines": self.baselines,
        }

if __name__ == "__main__":
    from feast import FeatureStore
    store = FeatureStore(repo_path="feature_repo")
    monitor = FeatureMonitor(store)

    # Generate test data
    baseline = np.random.normal(0.5, 0.1, 1000)
    current = np.random.normal(0.6, 0.15, 1000)  # shifted

    monitor.compute_baseline("conv_rate", baseline)
    drift = monitor.check_drift("conv_rate", current)
    print(f"Drift detected: {drift['drift']}")
    print(f"KS stat: {drift['ks_statistic']}, p={drift['p_value']}")

    quality = monitor.check_quality("conv_rate", current)
    print(f"Quality healthy: {quality['healthy']}")

    import json
    print(json.dumps(monitor.get_report(), indent=2))
💡
Production tip: Run monitoring checks on a schedule (e.g., hourly via cron or Airflow) and send alerts to Slack or PagerDuty when drift or quality issues are detected.