Intermediate

Multi-Provider Routing

The routing engine is the brain of your AI gateway. It decides which provider handles each request based on cost, latency, availability, and model capabilities. A well-designed router can cut costs by 40%, improve reliability to 99.99%, and route requests to the best model for each task — all transparently to the application.

Routing Strategy Overview

Production gateways combine multiple routing strategies. Each strategy serves a different goal:

StrategyGoalWhen to Use
Round Robin Even distribution Multiple accounts with same provider to increase rate limits
Weighted Proportional traffic split A/B testing models, gradual migration between providers
Latency-Based Fastest response User-facing real-time applications
Cost-Based Cheapest provider Batch processing, internal tools, non-latency-sensitive tasks
Fallback Chain Maximum reliability Every production deployment (always have a fallback)
Capability Match Right model for the task Routing vision requests, long context, structured output

Production Router Implementation

Here is a complete multi-strategy router that you can deploy in production. It supports fallback chains, latency tracking, cost optimization, and model capability matching:

import asyncio
import time
import random
from dataclasses import dataclass, field
from typing import Optional
from collections import deque
import httpx

@dataclass
class ProviderEndpoint:
    """Represents one AI provider endpoint."""
    name: str                    # "openai-1", "anthropic-prod"
    provider: str                # "openai", "anthropic", "google", "local"
    base_url: str
    api_key: str
    models: list[str]            # Models available at this endpoint
    cost_per_1k_input: float     # Cost per 1K input tokens
    cost_per_1k_output: float    # Cost per 1K output tokens
    max_context: int = 128000    # Max context window
    supports_vision: bool = False
    supports_streaming: bool = True
    supports_json_mode: bool = True
    weight: float = 1.0          # For weighted routing
    priority: int = 1            # For fallback ordering (1 = highest)
    # Runtime health tracking
    is_healthy: bool = True
    consecutive_failures: int = 0
    latency_history: deque = field(default_factory=lambda: deque(maxlen=100))

    @property
    def avg_latency_ms(self) -> float:
        if not self.latency_history:
            return 999999
        return sum(self.latency_history) / len(self.latency_history)


class AIRouter:
    """Multi-strategy router for AI gateway."""

    def __init__(self, endpoints: list[ProviderEndpoint]):
        self.endpoints = endpoints
        self.round_robin_index = 0

    def select(
        self,
        model: str,
        strategy: str = "fallback",
        requirements: Optional[dict] = None
    ) -> list[ProviderEndpoint]:
        """
        Select ordered list of endpoints to try.
        Returns list for fallback - try first, fall back to rest.
        """
        requirements = requirements or {}

        # Step 1: Filter to endpoints that support the requested model
        candidates = [ep for ep in self.endpoints if self._matches(ep, model, requirements)]

        if not candidates:
            raise ValueError(f"No healthy endpoint supports model={model}")

        # Step 2: Order by strategy
        if strategy == "cost":
            return self._sort_by_cost(candidates)
        elif strategy == "latency":
            return self._sort_by_latency(candidates)
        elif strategy == "weighted":
            return self._sort_by_weight(candidates)
        elif strategy == "round-robin":
            return self._round_robin(candidates)
        else:  # "fallback" - default
            return self._sort_by_priority(candidates)

    def _matches(self, ep: ProviderEndpoint, model: str, reqs: dict) -> bool:
        """Check if endpoint supports model and requirements."""
        if not ep.is_healthy:
            return False
        if model not in ep.models and not self._model_alias_match(ep, model):
            return False
        if reqs.get("vision") and not ep.supports_vision:
            return False
        if reqs.get("json_mode") and not ep.supports_json_mode:
            return False
        if reqs.get("min_context", 0) > ep.max_context:
            return False
        return True

    def _model_alias_match(self, ep: ProviderEndpoint, model: str) -> bool:
        """Handle model aliases: gpt-4o -> maps to any gpt-4o endpoint."""
        return any(model in m or m in model for m in ep.models)

    def _sort_by_cost(self, eps: list) -> list:
        return sorted(eps, key=lambda e: e.cost_per_1k_input + e.cost_per_1k_output)

    def _sort_by_latency(self, eps: list) -> list:
        return sorted(eps, key=lambda e: e.avg_latency_ms)

    def _sort_by_priority(self, eps: list) -> list:
        return sorted(eps, key=lambda e: e.priority)

    def _sort_by_weight(self, eps: list) -> list:
        """Weighted random selection - higher weight = more likely first."""
        shuffled = eps.copy()
        random.shuffle(shuffled)  # Break ties randomly
        return sorted(shuffled, key=lambda e: -e.weight)

    def _round_robin(self, eps: list) -> list:
        self.round_robin_index = (self.round_robin_index + 1) % len(eps)
        idx = self.round_robin_index
        return eps[idx:] + eps[:idx]

    def record_success(self, endpoint: ProviderEndpoint, latency_ms: float):
        """Update endpoint health after successful request."""
        endpoint.latency_history.append(latency_ms)
        endpoint.consecutive_failures = 0
        endpoint.is_healthy = True

    def record_failure(self, endpoint: ProviderEndpoint):
        """Update endpoint health after failed request."""
        endpoint.consecutive_failures += 1
        if endpoint.consecutive_failures >= 3:
            endpoint.is_healthy = False
            # Schedule health check to re-enable
            asyncio.get_event_loop().call_later(
                30.0, self._health_check, endpoint
            )

    async def _health_check(self, endpoint: ProviderEndpoint):
        """Try a lightweight request to see if endpoint recovered."""
        try:
            async with httpx.AsyncClient() as client:
                resp = await client.get(
                    f"{endpoint.base_url}/models",
                    headers={"Authorization": f"Bearer {endpoint.api_key}"},
                    timeout=5.0
                )
                if resp.status_code == 200:
                    endpoint.is_healthy = True
                    endpoint.consecutive_failures = 0
        except Exception:
            # Still unhealthy, check again later
            asyncio.get_event_loop().call_later(
                60.0, self._health_check, endpoint
            )

Fallback Chains

Fallback chains are the most critical routing pattern. When your primary provider has an outage, the gateway automatically routes to the next provider with zero application changes:

async def execute_with_fallback(
    router: AIRouter,
    request_body: dict,
    strategy: str = "fallback"
) -> dict:
    """Execute request with automatic fallback across providers."""
    model = request_body["model"]
    endpoints = router.select(model, strategy=strategy)

    last_error = None
    for endpoint in endpoints:
        try:
            start = time.time()
            async with httpx.AsyncClient() as client:
                # Translate request format if needed (OpenAI -> Anthropic)
                translated = translate_request(request_body, endpoint.provider)

                response = await client.post(
                    f"{endpoint.base_url}/chat/completions",
                    headers={
                        "Authorization": f"Bearer {endpoint.api_key}",
                        "Content-Type": "application/json",
                    },
                    json=translated,
                    timeout=120.0
                )

                if response.status_code == 200:
                    latency_ms = (time.time() - start) * 1000
                    router.record_success(endpoint, latency_ms)
                    result = response.json()
                    # Normalize response to OpenAI format
                    return normalize_response(result, endpoint.provider)

                elif response.status_code == 429:
                    # Rate limited - try next provider immediately
                    router.record_failure(endpoint)
                    last_error = f"{endpoint.name}: rate limited"
                    continue

                elif response.status_code >= 500:
                    # Server error - try next provider
                    router.record_failure(endpoint)
                    last_error = f"{endpoint.name}: {response.status_code}"
                    continue

                else:
                    # Client error (400, 401) - don't retry, it's our fault
                    return response.json()

        except httpx.TimeoutException:
            router.record_failure(endpoint)
            last_error = f"{endpoint.name}: timeout"
            continue
        except Exception as e:
            router.record_failure(endpoint)
            last_error = f"{endpoint.name}: {str(e)}"
            continue

    raise Exception(f"All providers failed. Last error: {last_error}")


def translate_request(body: dict, provider: str) -> dict:
    """Translate OpenAI format to provider-specific format."""
    if provider == "openai":
        return body
    elif provider == "anthropic":
        return {
            "model": body["model"],
            "max_tokens": body.get("max_tokens", 4096),
            "messages": body["messages"],
            "temperature": body.get("temperature", 1.0),
        }
    elif provider == "google":
        return {
            "contents": [
                {"role": msg["role"], "parts": [{"text": msg["content"]}]}
                for msg in body["messages"]
            ],
            "generationConfig": {
                "temperature": body.get("temperature", 1.0),
                "maxOutputTokens": body.get("max_tokens", 4096),
            }
        }
    return body
💡
Apply at work: Always configure at least two providers in your fallback chain. OpenAI primary with Anthropic fallback (or vice versa) covers 99.9% of outage scenarios. The gateway handles format translation, so your application code never changes.

Cost-Based Routing Configuration

Cost-based routing can save 30-60% on API spend by routing non-critical requests to cheaper providers or models:

# Provider pricing configuration (as of early 2026)
PROVIDER_CONFIG = [
    ProviderEndpoint(
        name="openai-gpt4o",
        provider="openai",
        base_url="https://api.openai.com/v1",
        api_key="sk-...",
        models=["gpt-4o", "gpt-4o-2024-11-20"],
        cost_per_1k_input=2.50,    # $2.50 per 1M input tokens
        cost_per_1k_output=10.00,  # $10.00 per 1M output tokens
        supports_vision=True,
        priority=1,
    ),
    ProviderEndpoint(
        name="anthropic-sonnet",
        provider="anthropic",
        base_url="https://api.anthropic.com/v1",
        api_key="sk-ant-...",
        models=["claude-sonnet-4-20250514"],
        cost_per_1k_input=3.00,
        cost_per_1k_output=15.00,
        supports_vision=True,
        priority=2,
    ),
    ProviderEndpoint(
        name="google-flash",
        provider="google",
        base_url="https://generativelanguage.googleapis.com/v1beta",
        api_key="AIza...",
        models=["gemini-2.0-flash"],
        cost_per_1k_input=0.10,    # 25x cheaper than GPT-4o
        cost_per_1k_output=0.40,
        supports_vision=True,
        max_context=1000000,
        priority=3,
    ),
    ProviderEndpoint(
        name="local-llama",
        provider="local",
        base_url="http://gpu-server.internal:8080/v1",
        api_key="local-key",
        models=["llama-3.1-70b"],
        cost_per_1k_input=0.0,     # Free - your own GPU
        cost_per_1k_output=0.0,
        supports_vision=False,
        max_context=128000,
        priority=4,
    ),
]

# Route by use case
router = AIRouter(PROVIDER_CONFIG)

# Critical user-facing: fastest provider first
user_facing = router.select("gpt-4o", strategy="latency")

# Batch summarization: cheapest provider first
batch_job = router.select("gemini-2.0-flash", strategy="cost")

# Internal tools: free local model, fallback to cloud
internal = router.select("llama-3.1-70b", strategy="fallback")

Model Capability Matching

Not every model supports every feature. The router must match request requirements to model capabilities:

# Automatic capability-based routing
requirements = {
    "vision": True,         # Request includes images
    "json_mode": True,      # Needs structured JSON output
    "min_context": 200000,  # Needs 200K+ context window
}

# Router automatically filters to endpoints matching ALL requirements
# In this case: only Google Gemini supports vision + 200K+ context
endpoints = router.select(
    model="auto",  # Let gateway pick best model
    strategy="cost",
    requirements=requirements
)
# Returns: [google-flash] (only one matching all requirements)
📝
Production reality: Model capability matching prevents runtime errors. Without it, a vision request routed to a text-only model returns a confusing 400 error. With it, the gateway silently picks the right endpoint. Update capability metadata when providers add new features.

Key Takeaways

  • Combine multiple routing strategies: fallback for reliability, cost-based for batch jobs, latency-based for user-facing requests.
  • Always configure at least two providers in fallback chains to handle outages transparently.
  • Track endpoint health with consecutive failure counts and automatic circuit breaking after 3 failures.
  • Cost-based routing between GPT-4o and Gemini Flash can save 25x on input costs for non-critical tasks.
  • Capability matching prevents runtime errors by filtering endpoints based on vision, context length, and JSON mode support.

What Is Next

In the next lesson, we will build rate limiting and quota management — preventing any single team from exhausting your organization's API limits while ensuring fair allocation across all consumers.