Advanced

Caching & Performance

Caching is the single highest-ROI optimization in an AI gateway. Organizations routinely ask the same questions, use identical system prompts, and repeat similar queries. A gateway cache can reduce API costs by 30-60% and cut response times from seconds to milliseconds for cache hits.

Exact Match vs Semantic Caching

StrategyHow It WorksHit RateUse Case
Exact Match SHA-256 hash of full request body, lookup in Redis 10-30% Templated prompts, batch jobs, repeated classification tasks
Semantic Embed the prompt, vector-search for similar cached prompts 20-50% Customer support, search queries, FAQ-type questions

Exact Match Cache

Start with exact match caching — it is simpler, faster, and has zero false positives:

import hashlib
import json
import time
import redis.asyncio as aioredis

class ExactMatchCache:
    """Cache LLM responses by exact request hash in Redis."""

    def __init__(self, redis_url: str, default_ttl: int = 3600):
        self.redis = aioredis.from_url(redis_url)
        self.ttl = default_ttl

    def _cache_key(self, body: dict) -> str:
        """Deterministic cache key from request body."""
        # Only include fields that affect the response
        relevant = {
            "model": body.get("model"),
            "messages": body.get("messages"),
            "temperature": body.get("temperature", 1.0),
            "max_tokens": body.get("max_tokens"),
            "tools": body.get("tools"),
        }
        serialized = json.dumps(
            {k: v for k, v in relevant.items() if v is not None},
            sort_keys=True
        )
        return f"llm:exact:{hashlib.sha256(serialized.encode()).hexdigest()}"

    async def get(self, body: dict) -> dict | None:
        """Check cache. Returns None on miss."""
        # Only cache deterministic requests
        if body.get("temperature", 1.0) > 0.5:
            return None
        if body.get("stream"):
            return None

        key = self._cache_key(body)
        cached = await self.redis.get(key)
        if cached:
            result = json.loads(cached)
            result["_cached"] = True
            result["_cache_type"] = "exact"
            return result
        return None

    async def set(self, body: dict, response: dict, ttl: int = None):
        """Cache a successful response."""
        if body.get("temperature", 1.0) > 0.5 or body.get("stream"):
            return
        if "error" in response:
            return

        key = self._cache_key(body)
        cache_data = {
            "choices": response.get("choices"),
            "usage": response.get("usage"),
            "model": response.get("model"),
            "_cached_at": time.time(),
        }
        await self.redis.setex(key, ttl or self.ttl, json.dumps(cache_data))

Semantic Cache

Semantic caching catches paraphrased queries. "What is the capital of France?" and "Tell me the capital city of France" return the same cached answer:

from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

class SemanticCache:
    """Cache LLM responses using embedding similarity."""

    def __init__(
        self,
        qdrant_url: str = "http://localhost:6333",
        threshold: float = 0.95,  # Similarity threshold (0.95 = very similar)
        ttl: int = 3600,
    ):
        self.qdrant = QdrantClient(url=qdrant_url)
        self.embedder = OpenAI()
        self.threshold = threshold
        self.ttl = ttl
        self.collection = "gateway_semantic_cache"
        self._init_collection()

    def _init_collection(self):
        collections = [c.name for c in self.qdrant.get_collections().collections]
        if self.collection not in collections:
            self.qdrant.create_collection(
                collection_name=self.collection,
                vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
            )

    def _embed(self, text: str) -> list[float]:
        return self.embedder.embeddings.create(
            input=text[:2000],  # Limit embedding input
            model="text-embedding-3-small"
        ).data[0].embedding

    def _prompt_text(self, body: dict) -> str:
        """Extract cacheable text from request."""
        parts = []
        for msg in body.get("messages", []):
            if msg["role"] in ("system", "user"):
                content = msg.get("content", "")
                if isinstance(content, str):
                    parts.append(content)
        return " ".join(parts)[-2000:]

    async def get(self, body: dict) -> dict | None:
        """Find semantically similar cached response."""
        if body.get("temperature", 1.0) > 0.3:
            return None  # Only for very deterministic requests

        text = self._prompt_text(body)
        embedding = self._embed(text)

        results = self.qdrant.search(
            collection_name=self.collection,
            query_vector=embedding,
            limit=1,
            score_threshold=self.threshold
        )

        if results:
            cached = results[0].payload
            # Check TTL
            if time.time() - cached["cached_at"] > self.ttl:
                return None  # Expired

            response = json.loads(cached["response_json"])
            response["_cached"] = True
            response["_cache_type"] = "semantic"
            response["_similarity"] = round(results[0].score, 4)
            return response
        return None

    async def set(self, body: dict, response: dict):
        if body.get("temperature", 1.0) > 0.3 or "error" in response:
            return

        text = self._prompt_text(body)
        embedding = self._embed(text)
        point_id = hashlib.md5(text.encode()).hexdigest()

        self.qdrant.upsert(
            collection_name=self.collection,
            points=[PointStruct(
                id=point_id,
                vector=embedding,
                payload={
                    "model": body.get("model"),
                    "prompt_preview": text[:200],
                    "response_json": json.dumps({
                        "choices": response.get("choices"),
                        "usage": response.get("usage"),
                        "model": response.get("model"),
                    }),
                    "cached_at": time.time(),
                }
            )]
        )
💡
Apply at work: Deploy exact match caching first (takes 30 minutes, Redis only). Add semantic caching only after you measure that paraphrased queries are common in your workload. Keep the similarity threshold at 0.95+ to avoid incorrect cache hits. Monitor precision weekly by reviewing a sample of semantic cache matches.

Response Streaming Through Gateway

Streaming is critical for user-facing applications. The gateway must forward tokens as they arrive while tracking usage for cost and rate limiting:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import httpx

app = FastAPI()

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    body = await request.json()
    gw_key = request.headers.get("Authorization", "").replace("Bearer ", "")

    # Auth, rate limit, PII filter, cache check (same as non-streaming)
    caller = key_manager.validate(gw_key)
    endpoint = router.select(body["model"])[0]

    if not body.get("stream"):
        # Non-streaming: standard proxy
        response = await proxy_request(endpoint, body)
        return response

    # Streaming: forward tokens as they arrive
    async def stream_generator():
        full_content = []
        usage_data = {}

        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                f"{endpoint.base_url}/chat/completions",
                headers={
                    "Authorization": f"Bearer {endpoint.api_key}",
                    "Content-Type": "application/json",
                },
                json=body,
                timeout=120.0
            ) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: "):
                        yield line + "\n\n"  # Forward SSE event immediately

                        # Parse for tracking
                        data = line[6:]
                        if data != "[DONE]":
                            chunk = json.loads(data)
                            delta = chunk.get("choices", [{}])[0].get("delta", {})
                            if delta.get("content"):
                                full_content.append(delta["content"])
                            if chunk.get("usage"):
                                usage_data = chunk["usage"]

        # After stream completes: record cost
        output_tokens = usage_data.get("completion_tokens", len("".join(full_content)) // 4)
        input_tokens = usage_data.get("prompt_tokens", 0)
        cost = calculate_request_cost(body["model"], input_tokens, output_tokens)
        await budget_enforcer.record_spend(caller.team_id, cost["total_cost"])

    return StreamingResponse(
        stream_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Gateway-Request-Id": str(uuid.uuid4()),
        }
    )

Latency Optimization

Target: gateway overhead under 5ms for cache hits, under 15ms for routed requests:

# 1. Connection pooling (biggest single win)
# Reuse HTTP connections to providers - saves 50-100ms per request
provider_pool = httpx.AsyncClient(
    limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
    http2=True,  # HTTP/2 multiplexing
    timeout=httpx.Timeout(60.0, connect=5.0)
)

# 2. Parallel pre-checks (auth + rate limit + cache in parallel)
import asyncio

async def pre_flight(body, api_key):
    auth, rate, cached = await asyncio.gather(
        key_manager.validate_async(api_key),
        rate_limiter.check_async(api_key),
        cache.get(body),
    )
    return auth, rate, cached
    # Takes max(auth_time, rate_time, cache_time) instead of sum

# 3. Fast serialization with orjson (10x faster than stdlib json)
import orjson
data = orjson.dumps(response)  # 50us vs 500us

# 4. In-memory caching for hot data
from functools import lru_cache

@lru_cache(maxsize=500)
def get_pricing(model: str):
    return PRICING.get(model)

# Gateway latency budget:
# Key validation:  < 0.5ms (Redis hash lookup)
# Rate limit:      < 0.5ms (Redis Lua script)
# PII regex scan:  < 2ms   (compiled patterns)
# Cache lookup:    < 1ms   (Redis GET) or < 3ms (vector search)
# Routing logic:   < 0.1ms (in-memory)
# ─────────────────────────────
# Total overhead:  < 5ms   (cache hit path)

Cache Hit Rate Monitoring

Track cache performance to validate your caching strategy is working and detect regressions:

class CacheMetrics:
    """Track and report cache performance."""

    def __init__(self, redis_url):
        self.redis = aioredis.from_url(redis_url)

    async def record(self, hit: bool, cache_type: str, model: str, team: str):
        today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
        key = f"cache:metrics:{today}"
        field = f"{'hit' if hit else 'miss'}:{cache_type}:{model}:{team}"
        await self.redis.hincrby(key, field, 1)
        await self.redis.hincrby(key, "hit" if hit else "miss", 1)
        await self.redis.expire(key, 604800)  # 7 days

    async def get_daily_report(self, date: str = None) -> dict:
        date = date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
        data = await self.redis.hgetall(f"cache:metrics:{date}")
        hits = int(data.get("hit", 0))
        misses = int(data.get("miss", 0))
        total = hits + misses
        return {
            "date": date,
            "total": total,
            "hits": hits,
            "misses": misses,
            "hit_rate": round(100 * hits / total, 1) if total else 0,
            "estimated_savings_pct": round(95 * hits / total, 1) if total else 0,
        }

# Alerting thresholds:
# - Hit rate < 10%: Cache may be misconfigured or TTL too short
# - Hit rate > 60%: Great! Consider longer TTLs
# - Hit rate drops 10%+ from yesterday: Investigate (new prompts? config change?)
📝
Production reality: A 25% cache hit rate at an organization spending $10,000/month saves approximately $2,375/month (95% of cache-hit cost saved, minus ~$50/month for Redis and embedding costs). The ROI on gateway caching is typically 50-100x within the first month. Track hit rate daily and investigate any drops greater than 10%.

Key Takeaways

  • Implement exact match caching first (Redis, 30 minutes to deploy). Add semantic caching only if paraphrased queries are common.
  • Only cache low-temperature requests (temperature ≤ 0.5 for exact, ≤ 0.3 for semantic). High temperature requests want variety.
  • Stream responses through the gateway by forwarding SSE events immediately while accumulating tokens for cost tracking after the stream completes.
  • Target under 5ms gateway overhead with connection pooling, parallel pre-checks, orjson, and in-memory caching of hot data.
  • Monitor cache hit rate daily. Target 20%+ for most workloads. A 25% hit rate saves approximately 24% on total API costs.

What Is Next

In the final lesson, we cover best practices and deployment checklist — how to migrate teams from direct API calls, deploy across multiple regions, and a comprehensive FAQ for AI gateway platform engineers.