Advanced
Feature Monitoring
Monitor data quality, detect feature drift, and alert on freshness issues.
Feature Monitor
# src/monitor.py
import numpy as np
import pandas as pd
from scipy import stats
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class FeatureMonitor:
def __init__(self, store):
self.store = store
self.baselines = {}
self.alerts = []
def compute_baseline(self, feature_name, data):
"""Compute statistical baseline for a feature."""
self.baselines[feature_name] = {
"mean": float(np.mean(data)),
"std": float(np.std(data)),
"min": float(np.min(data)),
"max": float(np.max(data)),
"null_rate": float(np.isnan(data).mean()),
"computed_at": datetime.now().isoformat(),
}
def check_drift(self, feature_name, current_data,
threshold=0.05):
"""Detect distribution drift using KS test."""
if feature_name not in self.baselines:
return {"drift": False, "reason": "no baseline"}
baseline = self.baselines[feature_name]
baseline_data = np.random.normal(
baseline["mean"], baseline["std"], len(current_data))
ks_stat, p_value = stats.ks_2samp(baseline_data, current_data)
drifted = p_value < threshold
result = {
"feature": feature_name,
"drift": drifted,
"ks_statistic": round(float(ks_stat), 4),
"p_value": round(float(p_value), 4),
"current_mean": round(float(np.mean(current_data)), 4),
"baseline_mean": baseline["mean"],
}
if drifted:
self.alerts.append({
"type": "drift",
"feature": feature_name,
"severity": "high" if p_value < 0.01 else "medium",
"timestamp": datetime.now().isoformat(),
**result,
})
return result
def check_quality(self, feature_name, data):
"""Check data quality metrics."""
issues = []
null_rate = float(np.isnan(data).mean()) if hasattr(data, '__len__') else 0
if null_rate > 0.05:
issues.append(f"High null rate: {null_rate:.1%}")
if len(set(data)) == 1:
issues.append("Single value (constant feature)")
if np.any(np.isinf(data)):
issues.append("Contains infinity values")
result = {
"feature": feature_name,
"null_rate": round(null_rate, 4),
"unique_values": len(set(data)),
"issues": issues,
"healthy": len(issues) == 0,
}
if issues:
self.alerts.append({
"type": "quality",
"feature": feature_name,
"severity": "high" if null_rate > 0.2 else "medium",
"issues": issues,
"timestamp": datetime.now().isoformat(),
})
return result
def check_freshness(self, feature_view_name,
max_age_hours=24):
"""Check if features are stale."""
try:
meta = self.store.get_feature_view(feature_view_name)
# Check last materialization time
fresh = True # simplified check
result = {
"feature_view": feature_view_name,
"fresh": fresh,
"max_age_hours": max_age_hours,
}
return result
except Exception as e:
return {"feature_view": feature_view_name,
"error": str(e)}
def get_report(self):
return {
"timestamp": datetime.now().isoformat(),
"total_alerts": len(self.alerts),
"alerts": self.alerts,
"baselines": self.baselines,
}
if __name__ == "__main__":
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo")
monitor = FeatureMonitor(store)
# Generate test data
baseline = np.random.normal(0.5, 0.1, 1000)
current = np.random.normal(0.6, 0.15, 1000) # shifted
monitor.compute_baseline("conv_rate", baseline)
drift = monitor.check_drift("conv_rate", current)
print(f"Drift detected: {drift['drift']}")
print(f"KS stat: {drift['ks_statistic']}, p={drift['p_value']}")
quality = monitor.check_quality("conv_rate", current)
print(f"Quality healthy: {quality['healthy']}")
import json
print(json.dumps(monitor.get_report(), indent=2))
Production tip: Run monitoring checks on a schedule (e.g., hourly via cron or Airflow) and send alerts to Slack or PagerDuty when drift or quality issues are detected.
Lilly Tech Systems