Advanced

ML Pipeline

Build end-to-end ML pipelines from data ingestion to production deployment with FastAPI, Docker, MLflow, and CI/CD.

End-to-End Pipeline Overview

  1. Data Ingestion

    Load data from databases, APIs, or files. Validate schema and data quality.

  2. Feature Engineering

    Create, transform, and select features. Build reusable feature pipelines.

  3. Model Training

    Train models with cross-validation. Track experiments with MLflow.

  4. Model Evaluation

    Compare models against baselines. Validate on held-out test set.

  5. Deployment

    Serve via REST API. Containerize with Docker. Set up monitoring.

Feature Engineering Pipeline

Python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

numeric_features = ["age", "salary", "experience"]
categorical_features = ["department", "education"]

preprocessor = ColumnTransformer([
    ("num", Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ]), numeric_features),
    ("cat", Pipeline([
        ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
        ("encoder", OneHotEncoder(handle_unknown="ignore"))
    ]), categorical_features)
])

full_pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("classifier", RandomForestClassifier())
])

Model Deployment with FastAPI

app.py
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np

app = FastAPI()
model = joblib.load("model.joblib")

class PredictionRequest(BaseModel):
    features: list[float]

@app.post("/predict")
def predict(request: PredictionRequest):
    X = np.array(request.features).reshape(1, -1)
    prediction = model.predict(X)[0]
    probability = model.predict_proba(X)[0].tolist()
    return {"prediction": int(prediction), "probability": probability}

# Run: uvicorn app:app --reload

Docker Containerization

Dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

MLflow Experiment Tracking

Python
import mlflow
import mlflow.sklearn

mlflow.set_experiment("my_classification_project")

with mlflow.start_run():
    # Log parameters
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 10)

    # Train model
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    # Log metrics
    mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred))
    mlflow.log_metric("f1", f1_score(y_test, y_pred))

    # Log model
    mlflow.sklearn.log_model(model, "model")

# View UI: mlflow ui --port 5000

Production Monitoring

Models degrade over time. Monitor for data drift (input distribution changes), concept drift (relationship between inputs and outputs changes), and performance degradation. Set up alerts when metrics drop below thresholds and retrain regularly.