Advanced

Building Production Pipelines

Moving multi-model AI systems from prototype to production. Learn pipeline architecture, workflow orchestration, async processing, monitoring, CI/CD strategies, and disaster recovery for systems that coordinate multiple AI models.

From Prototype to Production

A prototype that chains two API calls in a Jupyter notebook is very different from a production system handling thousands of requests per minute across multiple models. Production pipelines must handle failures gracefully, scale under load, provide observability, and keep costs predictable. This lesson covers the engineering practices that bridge that gap.

  • Prototype mindset: "Make it work" — synchronous calls, no error handling, hardcoded keys, single user
  • Production mindset: "Make it reliable, observable, and scalable" — async processing, retries, monitoring, multi-tenant
  • Key differences: Error handling, authentication, rate limiting, logging, deployment automation, cost tracking, and SLA guarantees

Pipeline Architecture

A production multi-model pipeline typically follows this flow:

Client Request
    |
API Gateway (auth, rate limiting, validation)
    |
Router (classify request, select pipeline)
    |
Model Services (parallel or sequential)
    |-- Text Model (Claude, GPT-4o)
    |-- Vision Model (GPT-4o Vision, Claude Vision)
    |-- Embedding Model (text-embedding-3-small)
    |-- Custom Model (fine-tuned classifier)
    |
Aggregator (merge results, post-process)
    |
Response (formatted, cached, logged)

Each component runs as an independent service, communicating through APIs or message queues. This separation allows you to scale each model service independently, deploy updates without downtime, and swap models without changing the pipeline logic.

Workflow Orchestration Tools

For complex multi-step pipelines, a workflow orchestrator manages execution order, retries, and dependencies between tasks.

ToolBest ForKey FeatureLearning Curve
Apache AirflowBatch pipelines, ETLDAG-based scheduling, huge ecosystemMedium
PrefectModern data pipelinesPython-native, automatic retries, cloud UILow
DagsterData-aware orchestrationAsset-based, type checking, dev experienceMedium
TemporalLong-running workflowsDurable execution, built-in state managementHigh

When to use what: Airflow and Prefect are ideal for scheduled batch pipelines (e.g., nightly document processing). Temporal excels at long-running, stateful workflows (e.g., multi-step agent tasks that may run for minutes). For real-time request-response pipelines, a custom FastAPI service with async processing is often simpler than introducing a full orchestrator.

Message Queues for Async Processing

When a pipeline involves slow model calls, message queues decouple the request from the processing. The client gets an immediate acknowledgment, and results are delivered asynchronously.

Queue SystemThroughputPersistenceBest For
Redis Queue (RQ)MediumOptionalSimple task queues, prototyping
CeleryHighYes (with broker)Python task processing, chains, groups
RabbitMQHighYesComplex routing, multi-language systems
Apache KafkaVery HighYes (log-based)Event streaming, audit trails, replay

Code Example: FastAPI Multi-Model Service

This example shows a production-grade FastAPI service that routes requests to multiple models with async processing, error handling, and structured logging.

import asyncio
import time
import uuid
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import anthropic
import openai
import structlog

logger = structlog.get_logger()
app = FastAPI(title="Multi-Model Pipeline")

class PipelineRequest(BaseModel):
    text: str
    pipeline: str = "analyze"  # analyze, summarize, translate

class PipelineResponse(BaseModel):
    request_id: str
    results: dict
    latency_ms: float
    models_used: list[str]

# Model clients
claude_client = anthropic.AsyncAnthropic()
openai_client = openai.AsyncOpenAI()

async def classify_text(text: str, request_id: str) -> dict:
    """Step 1: Classify the input text using a fast model."""
    start = time.perf_counter()
    response = await claude_client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=100,
        messages=[{"role": "user", "content": f"Classify this text into one category "
                   f"(technical, business, creative, legal): {text}"}]
    )
    latency = (time.perf_counter() - start) * 1000
    logger.info("classify_complete", request_id=request_id,
                latency_ms=round(latency, 2))
    return {"category": response.content[0].text, "latency_ms": latency}

async def generate_embedding(text: str, request_id: str) -> dict:
    """Step 2: Generate embeddings in parallel with analysis."""
    start = time.perf_counter()
    response = await openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    latency = (time.perf_counter() - start) * 1000
    logger.info("embedding_complete", request_id=request_id,
                latency_ms=round(latency, 2))
    return {"dimensions": len(response.data[0].embedding),
            "latency_ms": latency}

async def deep_analysis(text: str, category: str, request_id: str) -> dict:
    """Step 3: Deep analysis using the most capable model."""
    start = time.perf_counter()
    response = await claude_client.messages.create(
        model="claude-opus-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user",
                   "content": f"Provide a detailed {category} analysis "
                              f"of: {text}"}]
    )
    latency = (time.perf_counter() - start) * 1000
    logger.info("analysis_complete", request_id=request_id,
                latency_ms=round(latency, 2))
    return {"analysis": response.content[0].text, "latency_ms": latency}

@app.post("/pipeline/analyze", response_model=PipelineResponse)
async def analyze_pipeline(req: PipelineRequest):
    request_id = str(uuid.uuid4())
    start = time.perf_counter()
    logger.info("pipeline_start", request_id=request_id,
                pipeline=req.pipeline)

    try:
        # Step 1: Classify (must complete before step 3)
        classification = await classify_text(req.text, request_id)

        # Step 2 + 3: Run embedding and deep analysis in parallel
        embedding_task = generate_embedding(req.text, request_id)
        analysis_task = deep_analysis(
            req.text, classification["category"], request_id
        )
        embedding, analysis = await asyncio.gather(
            embedding_task, analysis_task
        )

        total_latency = (time.perf_counter() - start) * 1000
        logger.info("pipeline_complete", request_id=request_id,
                    total_latency_ms=round(total_latency, 2))

        return PipelineResponse(
            request_id=request_id,
            results={
                "classification": classification,
                "embedding": embedding,
                "analysis": analysis
            },
            latency_ms=round(total_latency, 2),
            models_used=["claude-sonnet-4-20250514",
                         "text-embedding-3-small",
                         "claude-opus-4-20250514"]
        )
    except Exception as e:
        logger.error("pipeline_failed", request_id=request_id,
                     error=str(e))
        raise HTTPException(status_code=500,
                            detail=f"Pipeline failed: {str(e)}")

Code Example: Celery Task Chain for Document Processing

This example demonstrates a Celery-based document processing pipeline that chains multiple model calls with automatic retries and error handling.

from celery import Celery, chain, chord
from celery.utils.log import get_task_logger
import anthropic
import json

app = Celery("doc_pipeline", broker="redis://localhost:6379/0",
             backend="redis://localhost:6379/1")
logger = get_task_logger(__name__)
client = anthropic.Anthropic()

@app.task(bind=True, max_retries=3, default_retry_delay=5)
def extract_text(self, document_url: str) -> dict:
    """Step 1: Extract and clean text from document."""
    try:
        # In production, use a document parser like Unstructured
        raw_text = fetch_and_parse_document(document_url)
        return {"text": raw_text, "source": document_url,
                "char_count": len(raw_text)}
    except Exception as exc:
        logger.warning(f"Extract failed, retrying: {exc}")
        raise self.retry(exc=exc)

@app.task(bind=True, max_retries=3, default_retry_delay=10)
def summarize_document(self, extracted: dict) -> dict:
    """Step 2: Generate summary using Claude."""
    try:
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=500,
            messages=[{"role": "user",
                       "content": f"Summarize this document in 3-5 "
                                  f"sentences:\n\n{extracted['text'][:8000]}"}]
        )
        extracted["summary"] = response.content[0].text
        return extracted
    except anthropic.RateLimitError as exc:
        raise self.retry(exc=exc, countdown=30)

@app.task(bind=True, max_retries=3)
def extract_entities(self, doc_data: dict) -> dict:
    """Step 3: Extract named entities."""
    try:
        response = client.messages.create(
            model="claude-haiku-4-20250514",
            max_tokens=300,
            messages=[{"role": "user",
                       "content": f"Extract key entities (people, orgs, "
                                  f"dates, amounts) as JSON from:\n\n"
                                  f"{doc_data['text'][:4000]}"}]
        )
        doc_data["entities"] = json.loads(response.content[0].text)
        return doc_data
    except Exception as exc:
        raise self.retry(exc=exc)

@app.task
def store_results(doc_data: dict) -> dict:
    """Step 4: Store processed results in database."""
    # In production, write to PostgreSQL, Elasticsearch, etc.
    logger.info(f"Storing results for: {doc_data['source']}")
    save_to_database(doc_data)
    return {"status": "complete", "source": doc_data["source"]}

# Build the pipeline chain
def process_document(document_url: str):
    """Execute the full document processing pipeline."""
    pipeline = chain(
        extract_text.s(document_url),
        summarize_document.s(),
        extract_entities.s(),
        store_results.s()
    )
    result = pipeline.apply_async()
    return result.id

Monitoring and Observability

You cannot manage what you cannot measure. Production multi-model systems require comprehensive monitoring across several dimensions.

What to Monitor

MetricWhy It MattersAlert Threshold
Latency per modelIdentifies slow models in the pipelinep95 > 2x baseline
Error rateCatches model failures and API issues> 5% in 5-minute window
Token usageTracks consumption and detects anomalies> 150% of daily average
Cost per requestBudget tracking and optimization> $0.50 per request
Queue depthIndicates processing backlog> 1000 pending tasks
Model availabilityDetects provider outagesAny model unreachable
Cache hit rateMeasures caching effectiveness< 30% (investigate)

Observability Stack

  • Prometheus: Collect metrics from each service. Use histograms for latency, counters for requests and errors, gauges for queue depth
  • Grafana: Build dashboards showing pipeline health, per-model performance, cost trends, and SLA compliance
  • OpenTelemetry: Distributed tracing across the entire pipeline. Trace a single request from API gateway through every model call to the final response
  • Custom metrics: Track AI-specific metrics like token usage per model, prompt lengths, output quality scores, and hallucination detection rates

Logging for Multi-Model Debugging

When a pipeline fails across multiple models, you need to trace exactly what happened. Two practices are essential:

  • Correlation IDs: Assign a unique ID to every request and propagate it through every model call, queue message, and log entry. This lets you reconstruct the full execution path from a single ID
  • Structured logging: Use JSON-formatted logs with consistent fields (request_id, model, latency_ms, token_count, status). This makes logs searchable and aggregatable in tools like Elasticsearch or Loki
# Structured logging example with structlog
import structlog

logger = structlog.get_logger()

def log_model_call(request_id, model, input_tokens, output_tokens,
                   latency_ms, status):
    logger.info("model_call",
        request_id=request_id,
        model=model,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        latency_ms=latency_ms,
        status=status,
        cost_usd=calculate_cost(model, input_tokens, output_tokens)
    )

CI/CD for AI Systems

Deploying multi-model systems requires practices beyond traditional CI/CD:

  • Model versioning: Pin every model version in your config. Never use "latest" in production. Track which model version produced each output
  • A/B testing: Route a percentage of traffic to a new model version. Compare quality metrics, latency, and cost before full rollout
  • Canary deployments: Deploy the new pipeline version to 5% of traffic first. Monitor error rates and latency. Automatically roll back if metrics degrade
  • Shadow mode: Run the new pipeline alongside the old one without serving results. Compare outputs offline to validate quality before switching
  • Evaluation gates: Before promoting a deployment, run an automated eval suite. If accuracy drops below threshold, block the deploy

Infrastructure

Docker Compose for Local Development

# docker-compose.yml for local multi-model development
version: "3.8"
services:
  api-gateway:
    build: ./gateway
    ports: ["8000:8000"]
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on: [redis, text-model, vision-model]

  text-model:
    build: ./services/text-model
    ports: ["8001:8001"]
    environment:
      - MODEL_NAME=claude-sonnet-4-20250514

  vision-model:
    build: ./services/vision-model
    ports: ["8002:8002"]

  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]

  worker:
    build: ./worker
    command: celery -A tasks worker --loglevel=info
    depends_on: [redis]

  prometheus:
    image: prom/prometheus
    ports: ["9090:9090"]
    volumes: ["./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml"]

  grafana:
    image: grafana/grafana
    ports: ["3000:3000"]
    depends_on: [prometheus]

Kubernetes Deployment

In production, Kubernetes lets you run model-specific pods with different resource requirements. A text generation pod needs CPU and memory, while an embedding service might need a GPU node. Use Horizontal Pod Autoscalers to scale each service based on queue depth or latency.

Infrastructure as Code

Use Terraform or Pulumi to provision GPU instances, load balancers, and managed services. Version your infrastructure alongside your application code so every deployment is reproducible.

Cost Tracking and Budgeting

  • Per-provider tracking: Log every API call with its cost. Aggregate daily costs per provider (Anthropic, OpenAI, Cohere, etc.)
  • Per-pipeline tracking: Calculate the total cost of each pipeline execution. Identify which pipelines are most expensive
  • Budget alerts: Set daily and monthly spending limits. Trigger alerts at 80% threshold. Automatically throttle or queue requests if budget is exceeded
  • Cost allocation: If serving multiple teams or customers, tag requests with tenant IDs to allocate costs accurately

Disaster Recovery

Production systems will fail. Plan for it:
  • Model fallbacks: If Claude is unavailable, route to GPT-4o. If GPT-4o fails, use Llama 3.3 via a self-hosted endpoint. Define a fallback chain for every model in your pipeline
  • Circuit breakers: After N consecutive failures to a model, stop sending requests for a cooldown period. This prevents cascading failures and wasted costs
  • Graceful degradation: If the embedding model is down, serve results without semantic search. If the vision model is unavailable, skip image analysis and return text-only results
  • Dead letter queues: Failed tasks go to a DLQ for manual inspection rather than being silently dropped
  • Health checks: Every model service exposes a health endpoint. The API gateway checks health before routing requests

Frequently Asked Questions

How do I handle different latency requirements across models?

Use timeouts per model call and set SLAs for the overall pipeline. For user-facing requests, run models in parallel where possible and set aggressive timeouts (5-10 seconds). For background processing, use message queues and allow longer execution times. Always have a fast fallback path if a slow model times out.

Should I use a workflow orchestrator or build my own pipeline?

For simple linear pipelines (3-5 steps), a custom FastAPI service with async/await is often sufficient and easier to debug. Adopt an orchestrator like Prefect or Temporal when you need complex branching, retries with state, scheduled runs, or visibility into pipeline execution across teams. Do not over-engineer early.

How do I test a multi-model pipeline end-to-end?

Use a three-layer approach: (1) Unit tests with mocked model responses for each pipeline step, (2) Integration tests with real API calls against a staging environment using a fixed test dataset, (3) Shadow mode in production where the new pipeline runs alongside the old one and outputs are compared offline. Budget for real API calls in your testing costs.

What is the best way to handle model version upgrades?

Pin model versions explicitly in configuration files. When a new version is available, deploy it in shadow mode first. Run your evaluation suite against both versions. If the new version meets quality thresholds, do a canary rollout (5% then 25% then 100%). Keep the previous version config available for instant rollback.