Intermediate

Feature Serving API

Build a FastAPI service that serves features for real-time and batch model inference.

Feature Serving API

# src/api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
import joblib
import numpy as np
from typing import List, Dict, Optional

app = FastAPI(title="ML Feature Platform API")
store = FeatureStore(repo_path="feature_repo")
model = joblib.load("model.joblib")

class FeatureRequest(BaseModel):
    entity_ids: List[int]
    features: Optional[List[str]] = None

class PredictionRequest(BaseModel):
    driver_ids: List[int]
    trip_distances: Optional[List[float]] = None

@app.get("/health")
def health():
    return {"status": "ok"}

@app.post("/features")
def get_features(req: FeatureRequest):
    """Get features for given entity IDs."""
    entity_rows = [{"driver_id": eid} for eid in req.entity_ids]
    feature_list = req.features or [
        "driver_stats:conv_rate",
        "driver_stats:acc_rate",
        "driver_stats:avg_daily_trips",
    ]
    try:
        result = store.get_online_features(
            features=feature_list,
            entity_rows=entity_rows,
        ).to_dict()
        return {"features": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/predict")
def predict(req: PredictionRequest):
    """Get features and run prediction in one call."""
    entity_rows = [{"driver_id": did} for did in req.driver_ids]
    features = store.get_online_features(
        features=[
            "driver_stats:conv_rate",
            "driver_stats:acc_rate",
            "driver_stats:avg_daily_trips",
        ],
        entity_rows=entity_rows,
    ).to_dict()

    X = np.array([
        [features["conv_rate"][i],
         features["acc_rate"][i],
         features["avg_daily_trips"][i]]
        for i in range(len(req.driver_ids))
    ])
    predictions = model.predict(X).tolist()
    probabilities = model.predict_proba(X)[:, 1].tolist()

    return {
        "predictions": predictions,
        "probabilities": probabilities,
        "driver_ids": req.driver_ids,
    }

@app.post("/features/batch")
def get_batch_features(req: FeatureRequest):
    """Batch feature retrieval for training or bulk scoring."""
    import pandas as pd
    from datetime import datetime
    entity_df = pd.DataFrame({
        "driver_id": req.entity_ids,
        "event_timestamp": [datetime.now()] * len(req.entity_ids),
    })
    result = store.get_historical_features(
        entity_df=entity_df,
        features=req.features or [
            "driver_stats:conv_rate",
            "driver_stats:acc_rate",
            "driver_stats:avg_daily_trips",
        ],
    ).to_df()
    return {"data": result.to_dict(orient="records")}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Test the API

# Start server
uvicorn src.api:app --reload

# Get features
curl -X POST http://localhost:8000/features \
  -H "Content-Type: application/json" \
  -d '{"entity_ids": [1, 2, 3]}'

# Get predictions
curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{"driver_ids": [1, 2, 3]}'
💡
Production tip: Add authentication, rate limiting, and request logging. Use Gunicorn with multiple workers for production deployments.