Intermediate

Streaming APIs for AI Products

Implement real-time streaming responses for LLMs and AI models using Server-Sent Events, WebSockets, and gRPC streaming.

Why Streaming Matters for AI

LLMs can take 10-60 seconds to generate a complete response. Without streaming, users stare at a loading spinner. With streaming, they see tokens appear in real-time, dramatically improving perceived performance and user experience.

💡
Time to first token (TTFT) is the most important latency metric for AI APIs. Streaming reduces TTFT from seconds (full response) to milliseconds (first token), making AI applications feel responsive even with large outputs.

Server-Sent Events (SSE)

SSE is the standard for LLM streaming, used by OpenAI, Anthropic, and most AI providers:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
    if request.stream:
        return StreamingResponse(
            stream_response(request),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "X-Accel-Buffering": "no",
            }
        )
    return await complete_response(request)

async def stream_response(request):
    async for token in model.generate_stream(request):
        chunk = {
            "id": request_id,
            "object": "chat.completion.chunk",
            "choices": [{
                "index": 0,
                "delta": {"content": token.text},
                "finish_reason": None
            }]
        }
        yield f"data: {json.dumps(chunk)}\n\n"

    # Send final chunk
    yield f"data: {json.dumps(final_chunk)}\n\n"
    yield "data: [DONE]\n\n"

Client-Side SSE Consumption

// JavaScript client for SSE streaming
async function streamChat(prompt) {
  const response = await fetch("/v1/chat/completions", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      model: "gpt-4",
      messages: [{ role: "user", content: prompt }],
      stream: true
    })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n");
    buffer = lines.pop();  // Keep incomplete line

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = line.slice(6);
        if (data === "[DONE]") return;
        const chunk = JSON.parse(data);
        const content = chunk.choices[0]?.delta?.content;
        if (content) appendToOutput(content);
      }
    }
  }
}

WebSocket Streaming

WebSockets are ideal for bidirectional AI communication like voice assistants and interactive agents:

from fastapi import WebSocket

@app.websocket("/v1/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_json()

            async for token in model.generate_stream(data):
                await websocket.send_json({
                    "type": "token",
                    "content": token.text
                })

            await websocket.send_json({
                "type": "done",
                "usage": {"total_tokens": token_count}
            })
    except WebSocketDisconnect:
        pass

Choosing a Streaming Protocol

ProtocolDirectionBest ForLimitations
SSEServer to clientLLM token streamingUnidirectional, text only
WebSocketBidirectionalChat, voice, interactive AIConnection management complexity
gRPC StreamingBoth directionsInternal microservicesNo browser support without proxy
HTTP/2 PushServer to clientMultiple concurrent streamsLimited browser API support
Default to SSE: For most AI APIs, SSE over HTTP/1.1 or HTTP/2 is the best choice. It works through CDNs, load balancers, and proxies with minimal configuration. Reserve WebSockets for true bidirectional needs like real-time voice or collaborative AI.

Handling Streaming Errors

Error handling in streaming contexts requires special attention:

  • Mid-stream errors: Send an error event in the stream, then close the connection gracefully.
  • Timeouts: Implement heartbeat events to prevent proxy timeouts during long generations.
  • Client disconnects: Detect disconnections early and cancel inference to save GPU resources.
  • Backpressure: Monitor client consumption rate and slow down if the client cannot keep up.