Intermediate
Feature Serving API
Build a FastAPI service that serves features for real-time and batch model inference.
Feature Serving API
# src/api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from feast import FeatureStore
import joblib
import numpy as np
from typing import List, Dict, Optional
app = FastAPI(title="ML Feature Platform API")
store = FeatureStore(repo_path="feature_repo")
model = joblib.load("model.joblib")
class FeatureRequest(BaseModel):
entity_ids: List[int]
features: Optional[List[str]] = None
class PredictionRequest(BaseModel):
driver_ids: List[int]
trip_distances: Optional[List[float]] = None
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/features")
def get_features(req: FeatureRequest):
"""Get features for given entity IDs."""
entity_rows = [{"driver_id": eid} for eid in req.entity_ids]
feature_list = req.features or [
"driver_stats:conv_rate",
"driver_stats:acc_rate",
"driver_stats:avg_daily_trips",
]
try:
result = store.get_online_features(
features=feature_list,
entity_rows=entity_rows,
).to_dict()
return {"features": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict")
def predict(req: PredictionRequest):
"""Get features and run prediction in one call."""
entity_rows = [{"driver_id": did} for did in req.driver_ids]
features = store.get_online_features(
features=[
"driver_stats:conv_rate",
"driver_stats:acc_rate",
"driver_stats:avg_daily_trips",
],
entity_rows=entity_rows,
).to_dict()
X = np.array([
[features["conv_rate"][i],
features["acc_rate"][i],
features["avg_daily_trips"][i]]
for i in range(len(req.driver_ids))
])
predictions = model.predict(X).tolist()
probabilities = model.predict_proba(X)[:, 1].tolist()
return {
"predictions": predictions,
"probabilities": probabilities,
"driver_ids": req.driver_ids,
}
@app.post("/features/batch")
def get_batch_features(req: FeatureRequest):
"""Batch feature retrieval for training or bulk scoring."""
import pandas as pd
from datetime import datetime
entity_df = pd.DataFrame({
"driver_id": req.entity_ids,
"event_timestamp": [datetime.now()] * len(req.entity_ids),
})
result = store.get_historical_features(
entity_df=entity_df,
features=req.features or [
"driver_stats:conv_rate",
"driver_stats:acc_rate",
"driver_stats:avg_daily_trips",
],
).to_df()
return {"data": result.to_dict(orient="records")}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Test the API
# Start server
uvicorn src.api:app --reload
# Get features
curl -X POST http://localhost:8000/features \
-H "Content-Type: application/json" \
-d '{"entity_ids": [1, 2, 3]}'
# Get predictions
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{"driver_ids": [1, 2, 3]}'
Production tip: Add authentication, rate limiting, and request logging. Use Gunicorn with multiple workers for production deployments.
Lilly Tech Systems