Intermediate

Implement Rate Limiters

Rate limiting is one of the most frequently asked system implementation problems. Companies like Stripe, Cloudflare, and AWS ask candidates to build rate limiters from scratch. This lesson covers four complete implementations: fixed window, sliding window log, token bucket, and leaky bucket — with full analysis of trade-offs.

Problem 1: Fixed Window Rate Limiter

The simplest rate limiter. Divide time into fixed windows (e.g., 1-minute intervals) and count requests per window. Allow up to N requests per window.

import time
from collections import defaultdict


class FixedWindowRateLimiter:
    """Fixed window rate limiter.

    Divides time into fixed intervals. Counts requests per interval.
    Allows up to max_requests per window.

    Trade-offs:
    - Pro: Simple, O(1) time and space per client
    - Con: Burst at window boundary (2x requests in 1 second
           if requests come at end of one window + start of next)
    """

    def __init__(self, max_requests: int, window_seconds: int):
        self._max_requests = max_requests
        self._window_seconds = window_seconds
        self._counters: Dict[str, int] = defaultdict(int)
        self._window_start: Dict[str, float] = {}

    def allow_request(self, client_id: str) -> bool:
        """Check if a request from client_id should be allowed."""
        now = time.time()
        current_window = now // self._window_seconds

        # Check if we are in a new window
        if self._window_start.get(client_id) != current_window:
            self._window_start[client_id] = current_window
            self._counters[client_id] = 0

        # Check if under limit
        if self._counters[client_id] < self._max_requests:
            self._counters[client_id] += 1
            return True

        return False

    def get_remaining(self, client_id: str) -> int:
        """Return remaining requests in current window."""
        now = time.time()
        current_window = now // self._window_seconds

        if self._window_start.get(client_id) != current_window:
            return self._max_requests

        return max(0, self._max_requests - self._counters[client_id])

    def get_retry_after(self, client_id: str) -> float:
        """Return seconds until the current window resets."""
        now = time.time()
        window_end = ((now // self._window_seconds) + 1) * self._window_seconds
        return window_end - now


# ---- Usage ----
limiter = FixedWindowRateLimiter(max_requests=5, window_seconds=60)

for i in range(7):
    allowed = limiter.allow_request("user_1")
    remaining = limiter.get_remaining("user_1")
    print(f"Request {i+1}: {'ALLOWED' if allowed else 'DENIED'} "
          f"(remaining: {remaining})")

Problem 2: Sliding Window Log Rate Limiter

Fixes the boundary burst problem by tracking individual request timestamps. More accurate but uses more memory.

from collections import deque


class SlidingWindowLogRateLimiter:
    """Sliding window log rate limiter.

    Stores timestamp of every request. Counts requests in the
    last window_seconds. Most accurate but highest memory usage.

    Trade-offs:
    - Pro: No boundary burst problem, perfectly accurate
    - Con: O(n) memory per client (stores every timestamp)
    - Con: O(n) cleanup time (removing old timestamps)
    """

    def __init__(self, max_requests: int, window_seconds: int):
        self._max_requests = max_requests
        self._window_seconds = window_seconds
        self._request_logs: Dict[str, deque] = defaultdict(deque)

    def allow_request(self, client_id: str) -> bool:
        """Check if a request should be allowed."""
        now = time.time()
        window_start = now - self._window_seconds

        log = self._request_logs[client_id]

        # Remove timestamps outside the current window
        while log and log[0] <= window_start:
            log.popleft()

        # Check if under limit
        if len(log) < self._max_requests:
            log.append(now)
            return True

        return False

    def get_remaining(self, client_id: str) -> int:
        """Return remaining requests in current sliding window."""
        now = time.time()
        window_start = now - self._window_seconds
        log = self._request_logs[client_id]

        # Count requests in window
        count = sum(1 for ts in log if ts > window_start)
        return max(0, self._max_requests - count)

    def get_retry_after(self, client_id: str) -> float:
        """Return seconds until the oldest request expires from the window."""
        if not self._request_logs[client_id]:
            return 0.0

        oldest = self._request_logs[client_id][0]
        retry_after = oldest + self._window_seconds - time.time()
        return max(0.0, retry_after)


# ---- Usage ----
limiter = SlidingWindowLogRateLimiter(max_requests=3, window_seconds=10)

for i in range(5):
    allowed = limiter.allow_request("user_1")
    print(f"Request {i+1}: {'ALLOWED' if allowed else 'DENIED'} "
          f"(remaining: {limiter.get_remaining('user_1')})")

print(f"Retry after: {limiter.get_retry_after('user_1'):.1f}s")

Problem 3: Token Bucket Rate Limiter

The industry standard. A bucket holds tokens that refill at a constant rate. Each request consumes one token. This naturally allows bursts while maintaining an average rate.

class TokenBucketRateLimiter:
    """Token bucket rate limiter.

    A bucket holds up to max_tokens tokens. Tokens are added at
    refill_rate per second. Each request consumes one token.

    Trade-offs:
    - Pro: Allows controlled bursts (up to bucket capacity)
    - Pro: Smooth rate limiting over time
    - Pro: O(1) time and space per client
    - Con: More complex to implement correctly
    - Con: Burst size is configurable but can be confusing

    Used by: AWS API Gateway, Stripe, most production systems
    """

    def __init__(self, max_tokens: int, refill_rate: float):
        """
        Args:
            max_tokens: Maximum tokens in the bucket (burst capacity)
            refill_rate: Tokens added per second (sustained rate)
        """
        self._max_tokens = max_tokens
        self._refill_rate = refill_rate
        self._tokens: Dict[str, float] = {}
        self._last_refill: Dict[str, float] = {}

    def _refill(self, client_id: str):
        """Add tokens based on elapsed time since last refill."""
        now = time.time()

        if client_id not in self._tokens:
            self._tokens[client_id] = self._max_tokens
            self._last_refill[client_id] = now
            return

        elapsed = now - self._last_refill[client_id]
        new_tokens = elapsed * self._refill_rate

        self._tokens[client_id] = min(
            self._max_tokens,
            self._tokens[client_id] + new_tokens
        )
        self._last_refill[client_id] = now

    def allow_request(self, client_id: str, tokens: int = 1) -> bool:
        """Check if a request should be allowed.

        Args:
            client_id: Unique client identifier
            tokens: Number of tokens to consume (default 1)
        """
        self._refill(client_id)

        if self._tokens[client_id] >= tokens:
            self._tokens[client_id] -= tokens
            return True

        return False

    def get_remaining(self, client_id: str) -> int:
        """Return available tokens."""
        self._refill(client_id)
        return int(self._tokens.get(client_id, self._max_tokens))

    def get_retry_after(self, client_id: str, tokens: int = 1) -> float:
        """Return seconds until enough tokens are available."""
        self._refill(client_id)
        current = self._tokens.get(client_id, self._max_tokens)
        if current >= tokens:
            return 0.0
        needed = tokens - current
        return needed / self._refill_rate


# ---- Usage ----
# 10 requests/second sustained, burst of 20
limiter = TokenBucketRateLimiter(max_tokens=20, refill_rate=10.0)

# Burst: first 20 requests go through instantly
for i in range(25):
    allowed = limiter.allow_request("user_1")
    if not allowed:
        retry = limiter.get_retry_after("user_1")
        print(f"Request {i+1}: DENIED (retry in {retry:.2f}s)")
        break
    print(f"Request {i+1}: ALLOWED (remaining: {limiter.get_remaining('user_1')})")

Problem 4: Leaky Bucket Rate Limiter

Requests enter a FIFO queue (the bucket) and are processed at a fixed rate. Unlike token bucket, this enforces a perfectly smooth output rate with no bursts.

from collections import deque
from dataclasses import dataclass


@dataclass
class QueuedRequest:
    client_id: str
    timestamp: float
    data: Any = None


class LeakyBucketRateLimiter:
    """Leaky bucket rate limiter.

    Requests are queued in a FIFO bucket. The bucket "leaks"
    (processes) requests at a fixed rate. If the bucket is full,
    new requests are dropped.

    Trade-offs:
    - Pro: Perfectly smooth output rate (no bursts)
    - Pro: Fair queuing across clients
    - Con: Added latency (requests wait in queue)
    - Con: No burst allowance (unlike token bucket)

    Used by: Network traffic shaping, API gateways that need
    smooth output rates
    """

    def __init__(self, bucket_size: int, leak_rate: float):
        """
        Args:
            bucket_size: Maximum number of queued requests
            leak_rate: Requests processed per second
        """
        self._bucket_size = bucket_size
        self._leak_rate = leak_rate
        self._queue: deque = deque()
        self._last_leak_time: float = time.time()

    def _leak(self):
        """Process (remove) requests from the queue based on elapsed time."""
        now = time.time()
        elapsed = now - self._last_leak_time

        # Number of requests that should have leaked out
        leaked = int(elapsed * self._leak_rate)

        if leaked > 0:
            # Remove leaked requests from front of queue
            for _ in range(min(leaked, len(self._queue))):
                self._queue.popleft()
            self._last_leak_time = now

    def allow_request(self, client_id: str, data: Any = None) -> bool:
        """Try to add a request to the bucket.

        Returns True if the request was queued (accepted).
        Returns False if the bucket is full (rejected).
        """
        self._leak()

        if len(self._queue) < self._bucket_size:
            self._queue.append(QueuedRequest(
                client_id=client_id,
                timestamp=time.time(),
                data=data,
            ))
            return True

        return False

    def get_queue_size(self) -> int:
        """Return current number of queued requests."""
        self._leak()
        return len(self._queue)

    def get_wait_time(self) -> float:
        """Return estimated wait time for a new request."""
        self._leak()
        if len(self._queue) == 0:
            return 0.0
        return len(self._queue) / self._leak_rate


# ---- Usage ----
limiter = LeakyBucketRateLimiter(bucket_size=5, leak_rate=2.0)

for i in range(8):
    allowed = limiter.allow_request(f"user_{i % 3}")
    print(f"Request {i+1}: {'QUEUED' if allowed else 'DROPPED'} "
          f"(queue: {limiter.get_queue_size()}, "
          f"wait: {limiter.get_wait_time():.1f}s)")

Comparison Table

AlgorithmMemoryBurstAccuracyBest For
Fixed WindowO(1)2x at boundaryLowSimple use cases, low traffic
Sliding WindowO(n)NoneHighAccuracy-critical, low-volume APIs
Token BucketO(1)ControlledHighProduction APIs (AWS, Stripe)
Leaky BucketO(n)NoneHighTraffic shaping, smooth output
💡
Interview tip: When asked "implement a rate limiter," start by asking which algorithm. If the interviewer says "your choice," go with token bucket — it is the industry standard, has O(1) complexity, and allows controlled bursts. Mention the trade-offs to show breadth of knowledge.

Key Takeaways

💡
  • Fixed window is simplest but has the 2x boundary burst problem
  • Sliding window log is most accurate but uses O(n) memory per client
  • Token bucket is the production standard: O(1) operations, controlled bursts
  • Leaky bucket ensures smooth output rate but adds latency
  • Always implement get_remaining() and get_retry_after() for proper 429 responses
  • In distributed systems, use Redis + Lua scripts for atomic rate limiting