Building Production Pipelines
Moving multi-model AI systems from prototype to production. Learn pipeline architecture, workflow orchestration, async processing, monitoring, CI/CD strategies, and disaster recovery for systems that coordinate multiple AI models.
From Prototype to Production
A prototype that chains two API calls in a Jupyter notebook is very different from a production system handling thousands of requests per minute across multiple models. Production pipelines must handle failures gracefully, scale under load, provide observability, and keep costs predictable. This lesson covers the engineering practices that bridge that gap.
- Prototype mindset: "Make it work" — synchronous calls, no error handling, hardcoded keys, single user
- Production mindset: "Make it reliable, observable, and scalable" — async processing, retries, monitoring, multi-tenant
- Key differences: Error handling, authentication, rate limiting, logging, deployment automation, cost tracking, and SLA guarantees
Pipeline Architecture
A production multi-model pipeline typically follows this flow:
Client Request
|
API Gateway (auth, rate limiting, validation)
|
Router (classify request, select pipeline)
|
Model Services (parallel or sequential)
|-- Text Model (Claude, GPT-4o)
|-- Vision Model (GPT-4o Vision, Claude Vision)
|-- Embedding Model (text-embedding-3-small)
|-- Custom Model (fine-tuned classifier)
|
Aggregator (merge results, post-process)
|
Response (formatted, cached, logged)
Each component runs as an independent service, communicating through APIs or message queues. This separation allows you to scale each model service independently, deploy updates without downtime, and swap models without changing the pipeline logic.
Workflow Orchestration Tools
For complex multi-step pipelines, a workflow orchestrator manages execution order, retries, and dependencies between tasks.
| Tool | Best For | Key Feature | Learning Curve |
|---|---|---|---|
| Apache Airflow | Batch pipelines, ETL | DAG-based scheduling, huge ecosystem | Medium |
| Prefect | Modern data pipelines | Python-native, automatic retries, cloud UI | Low |
| Dagster | Data-aware orchestration | Asset-based, type checking, dev experience | Medium |
| Temporal | Long-running workflows | Durable execution, built-in state management | High |
When to use what: Airflow and Prefect are ideal for scheduled batch pipelines (e.g., nightly document processing). Temporal excels at long-running, stateful workflows (e.g., multi-step agent tasks that may run for minutes). For real-time request-response pipelines, a custom FastAPI service with async processing is often simpler than introducing a full orchestrator.
Message Queues for Async Processing
When a pipeline involves slow model calls, message queues decouple the request from the processing. The client gets an immediate acknowledgment, and results are delivered asynchronously.
| Queue System | Throughput | Persistence | Best For |
|---|---|---|---|
| Redis Queue (RQ) | Medium | Optional | Simple task queues, prototyping |
| Celery | High | Yes (with broker) | Python task processing, chains, groups |
| RabbitMQ | High | Yes | Complex routing, multi-language systems |
| Apache Kafka | Very High | Yes (log-based) | Event streaming, audit trails, replay |
Code Example: FastAPI Multi-Model Service
This example shows a production-grade FastAPI service that routes requests to multiple models with async processing, error handling, and structured logging.
import asyncio
import time
import uuid
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import anthropic
import openai
import structlog
logger = structlog.get_logger()
app = FastAPI(title="Multi-Model Pipeline")
class PipelineRequest(BaseModel):
text: str
pipeline: str = "analyze" # analyze, summarize, translate
class PipelineResponse(BaseModel):
request_id: str
results: dict
latency_ms: float
models_used: list[str]
# Model clients
claude_client = anthropic.AsyncAnthropic()
openai_client = openai.AsyncOpenAI()
async def classify_text(text: str, request_id: str) -> dict:
"""Step 1: Classify the input text using a fast model."""
start = time.perf_counter()
response = await claude_client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
messages=[{"role": "user", "content": f"Classify this text into one category "
f"(technical, business, creative, legal): {text}"}]
)
latency = (time.perf_counter() - start) * 1000
logger.info("classify_complete", request_id=request_id,
latency_ms=round(latency, 2))
return {"category": response.content[0].text, "latency_ms": latency}
async def generate_embedding(text: str, request_id: str) -> dict:
"""Step 2: Generate embeddings in parallel with analysis."""
start = time.perf_counter()
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
latency = (time.perf_counter() - start) * 1000
logger.info("embedding_complete", request_id=request_id,
latency_ms=round(latency, 2))
return {"dimensions": len(response.data[0].embedding),
"latency_ms": latency}
async def deep_analysis(text: str, category: str, request_id: str) -> dict:
"""Step 3: Deep analysis using the most capable model."""
start = time.perf_counter()
response = await claude_client.messages.create(
model="claude-opus-4-20250514",
max_tokens=1024,
messages=[{"role": "user",
"content": f"Provide a detailed {category} analysis "
f"of: {text}"}]
)
latency = (time.perf_counter() - start) * 1000
logger.info("analysis_complete", request_id=request_id,
latency_ms=round(latency, 2))
return {"analysis": response.content[0].text, "latency_ms": latency}
@app.post("/pipeline/analyze", response_model=PipelineResponse)
async def analyze_pipeline(req: PipelineRequest):
request_id = str(uuid.uuid4())
start = time.perf_counter()
logger.info("pipeline_start", request_id=request_id,
pipeline=req.pipeline)
try:
# Step 1: Classify (must complete before step 3)
classification = await classify_text(req.text, request_id)
# Step 2 + 3: Run embedding and deep analysis in parallel
embedding_task = generate_embedding(req.text, request_id)
analysis_task = deep_analysis(
req.text, classification["category"], request_id
)
embedding, analysis = await asyncio.gather(
embedding_task, analysis_task
)
total_latency = (time.perf_counter() - start) * 1000
logger.info("pipeline_complete", request_id=request_id,
total_latency_ms=round(total_latency, 2))
return PipelineResponse(
request_id=request_id,
results={
"classification": classification,
"embedding": embedding,
"analysis": analysis
},
latency_ms=round(total_latency, 2),
models_used=["claude-sonnet-4-20250514",
"text-embedding-3-small",
"claude-opus-4-20250514"]
)
except Exception as e:
logger.error("pipeline_failed", request_id=request_id,
error=str(e))
raise HTTPException(status_code=500,
detail=f"Pipeline failed: {str(e)}")
Code Example: Celery Task Chain for Document Processing
This example demonstrates a Celery-based document processing pipeline that chains multiple model calls with automatic retries and error handling.
from celery import Celery, chain, chord
from celery.utils.log import get_task_logger
import anthropic
import json
app = Celery("doc_pipeline", broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1")
logger = get_task_logger(__name__)
client = anthropic.Anthropic()
@app.task(bind=True, max_retries=3, default_retry_delay=5)
def extract_text(self, document_url: str) -> dict:
"""Step 1: Extract and clean text from document."""
try:
# In production, use a document parser like Unstructured
raw_text = fetch_and_parse_document(document_url)
return {"text": raw_text, "source": document_url,
"char_count": len(raw_text)}
except Exception as exc:
logger.warning(f"Extract failed, retrying: {exc}")
raise self.retry(exc=exc)
@app.task(bind=True, max_retries=3, default_retry_delay=10)
def summarize_document(self, extracted: dict) -> dict:
"""Step 2: Generate summary using Claude."""
try:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=500,
messages=[{"role": "user",
"content": f"Summarize this document in 3-5 "
f"sentences:\n\n{extracted['text'][:8000]}"}]
)
extracted["summary"] = response.content[0].text
return extracted
except anthropic.RateLimitError as exc:
raise self.retry(exc=exc, countdown=30)
@app.task(bind=True, max_retries=3)
def extract_entities(self, doc_data: dict) -> dict:
"""Step 3: Extract named entities."""
try:
response = client.messages.create(
model="claude-haiku-4-20250514",
max_tokens=300,
messages=[{"role": "user",
"content": f"Extract key entities (people, orgs, "
f"dates, amounts) as JSON from:\n\n"
f"{doc_data['text'][:4000]}"}]
)
doc_data["entities"] = json.loads(response.content[0].text)
return doc_data
except Exception as exc:
raise self.retry(exc=exc)
@app.task
def store_results(doc_data: dict) -> dict:
"""Step 4: Store processed results in database."""
# In production, write to PostgreSQL, Elasticsearch, etc.
logger.info(f"Storing results for: {doc_data['source']}")
save_to_database(doc_data)
return {"status": "complete", "source": doc_data["source"]}
# Build the pipeline chain
def process_document(document_url: str):
"""Execute the full document processing pipeline."""
pipeline = chain(
extract_text.s(document_url),
summarize_document.s(),
extract_entities.s(),
store_results.s()
)
result = pipeline.apply_async()
return result.id
Monitoring and Observability
You cannot manage what you cannot measure. Production multi-model systems require comprehensive monitoring across several dimensions.
What to Monitor
| Metric | Why It Matters | Alert Threshold |
|---|---|---|
| Latency per model | Identifies slow models in the pipeline | p95 > 2x baseline |
| Error rate | Catches model failures and API issues | > 5% in 5-minute window |
| Token usage | Tracks consumption and detects anomalies | > 150% of daily average |
| Cost per request | Budget tracking and optimization | > $0.50 per request |
| Queue depth | Indicates processing backlog | > 1000 pending tasks |
| Model availability | Detects provider outages | Any model unreachable |
| Cache hit rate | Measures caching effectiveness | < 30% (investigate) |
Observability Stack
- Prometheus: Collect metrics from each service. Use histograms for latency, counters for requests and errors, gauges for queue depth
- Grafana: Build dashboards showing pipeline health, per-model performance, cost trends, and SLA compliance
- OpenTelemetry: Distributed tracing across the entire pipeline. Trace a single request from API gateway through every model call to the final response
- Custom metrics: Track AI-specific metrics like token usage per model, prompt lengths, output quality scores, and hallucination detection rates
Logging for Multi-Model Debugging
When a pipeline fails across multiple models, you need to trace exactly what happened. Two practices are essential:
- Correlation IDs: Assign a unique ID to every request and propagate it through every model call, queue message, and log entry. This lets you reconstruct the full execution path from a single ID
- Structured logging: Use JSON-formatted logs with consistent fields (request_id, model, latency_ms, token_count, status). This makes logs searchable and aggregatable in tools like Elasticsearch or Loki
# Structured logging example with structlog
import structlog
logger = structlog.get_logger()
def log_model_call(request_id, model, input_tokens, output_tokens,
latency_ms, status):
logger.info("model_call",
request_id=request_id,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
status=status,
cost_usd=calculate_cost(model, input_tokens, output_tokens)
)
CI/CD for AI Systems
Deploying multi-model systems requires practices beyond traditional CI/CD:
- Model versioning: Pin every model version in your config. Never use "latest" in production. Track which model version produced each output
- A/B testing: Route a percentage of traffic to a new model version. Compare quality metrics, latency, and cost before full rollout
- Canary deployments: Deploy the new pipeline version to 5% of traffic first. Monitor error rates and latency. Automatically roll back if metrics degrade
- Shadow mode: Run the new pipeline alongside the old one without serving results. Compare outputs offline to validate quality before switching
- Evaluation gates: Before promoting a deployment, run an automated eval suite. If accuracy drops below threshold, block the deploy
Infrastructure
Docker Compose for Local Development
# docker-compose.yml for local multi-model development
version: "3.8"
services:
api-gateway:
build: ./gateway
ports: ["8000:8000"]
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on: [redis, text-model, vision-model]
text-model:
build: ./services/text-model
ports: ["8001:8001"]
environment:
- MODEL_NAME=claude-sonnet-4-20250514
vision-model:
build: ./services/vision-model
ports: ["8002:8002"]
redis:
image: redis:7-alpine
ports: ["6379:6379"]
worker:
build: ./worker
command: celery -A tasks worker --loglevel=info
depends_on: [redis]
prometheus:
image: prom/prometheus
ports: ["9090:9090"]
volumes: ["./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml"]
grafana:
image: grafana/grafana
ports: ["3000:3000"]
depends_on: [prometheus]
Kubernetes Deployment
In production, Kubernetes lets you run model-specific pods with different resource requirements. A text generation pod needs CPU and memory, while an embedding service might need a GPU node. Use Horizontal Pod Autoscalers to scale each service based on queue depth or latency.
Infrastructure as Code
Use Terraform or Pulumi to provision GPU instances, load balancers, and managed services. Version your infrastructure alongside your application code so every deployment is reproducible.
Cost Tracking and Budgeting
- Per-provider tracking: Log every API call with its cost. Aggregate daily costs per provider (Anthropic, OpenAI, Cohere, etc.)
- Per-pipeline tracking: Calculate the total cost of each pipeline execution. Identify which pipelines are most expensive
- Budget alerts: Set daily and monthly spending limits. Trigger alerts at 80% threshold. Automatically throttle or queue requests if budget is exceeded
- Cost allocation: If serving multiple teams or customers, tag requests with tenant IDs to allocate costs accurately
Disaster Recovery
- Model fallbacks: If Claude is unavailable, route to GPT-4o. If GPT-4o fails, use Llama 3.3 via a self-hosted endpoint. Define a fallback chain for every model in your pipeline
- Circuit breakers: After N consecutive failures to a model, stop sending requests for a cooldown period. This prevents cascading failures and wasted costs
- Graceful degradation: If the embedding model is down, serve results without semantic search. If the vision model is unavailable, skip image analysis and return text-only results
- Dead letter queues: Failed tasks go to a DLQ for manual inspection rather than being silently dropped
- Health checks: Every model service exposes a health endpoint. The API gateway checks health before routing requests
Frequently Asked Questions
How do I handle different latency requirements across models?
Use timeouts per model call and set SLAs for the overall pipeline. For user-facing requests, run models in parallel where possible and set aggressive timeouts (5-10 seconds). For background processing, use message queues and allow longer execution times. Always have a fast fallback path if a slow model times out.
Should I use a workflow orchestrator or build my own pipeline?
For simple linear pipelines (3-5 steps), a custom FastAPI service with async/await is often sufficient and easier to debug. Adopt an orchestrator like Prefect or Temporal when you need complex branching, retries with state, scheduled runs, or visibility into pipeline execution across teams. Do not over-engineer early.
How do I test a multi-model pipeline end-to-end?
Use a three-layer approach: (1) Unit tests with mocked model responses for each pipeline step, (2) Integration tests with real API calls against a staging environment using a fixed test dataset, (3) Shadow mode in production where the new pipeline runs alongside the old one and outputs are compared offline. Budget for real API calls in your testing costs.
What is the best way to handle model version upgrades?
Pin model versions explicitly in configuration files. When a new version is available, deploy it in shadow mode first. Run your evaluation suite against both versions. If the new version meets quality thresholds, do a canary rollout (5% then 25% then 100%). Keep the previous version config available for instant rollback.
Lilly Tech Systems