Implement Rate Limiters
Rate limiting is one of the most frequently asked system implementation problems. Companies like Stripe, Cloudflare, and AWS ask candidates to build rate limiters from scratch. This lesson covers four complete implementations: fixed window, sliding window log, token bucket, and leaky bucket — with full analysis of trade-offs.
Problem 1: Fixed Window Rate Limiter
The simplest rate limiter. Divide time into fixed windows (e.g., 1-minute intervals) and count requests per window. Allow up to N requests per window.
import time
from collections import defaultdict
class FixedWindowRateLimiter:
"""Fixed window rate limiter.
Divides time into fixed intervals. Counts requests per interval.
Allows up to max_requests per window.
Trade-offs:
- Pro: Simple, O(1) time and space per client
- Con: Burst at window boundary (2x requests in 1 second
if requests come at end of one window + start of next)
"""
def __init__(self, max_requests: int, window_seconds: int):
self._max_requests = max_requests
self._window_seconds = window_seconds
self._counters: Dict[str, int] = defaultdict(int)
self._window_start: Dict[str, float] = {}
def allow_request(self, client_id: str) -> bool:
"""Check if a request from client_id should be allowed."""
now = time.time()
current_window = now // self._window_seconds
# Check if we are in a new window
if self._window_start.get(client_id) != current_window:
self._window_start[client_id] = current_window
self._counters[client_id] = 0
# Check if under limit
if self._counters[client_id] < self._max_requests:
self._counters[client_id] += 1
return True
return False
def get_remaining(self, client_id: str) -> int:
"""Return remaining requests in current window."""
now = time.time()
current_window = now // self._window_seconds
if self._window_start.get(client_id) != current_window:
return self._max_requests
return max(0, self._max_requests - self._counters[client_id])
def get_retry_after(self, client_id: str) -> float:
"""Return seconds until the current window resets."""
now = time.time()
window_end = ((now // self._window_seconds) + 1) * self._window_seconds
return window_end - now
# ---- Usage ----
limiter = FixedWindowRateLimiter(max_requests=5, window_seconds=60)
for i in range(7):
allowed = limiter.allow_request("user_1")
remaining = limiter.get_remaining("user_1")
print(f"Request {i+1}: {'ALLOWED' if allowed else 'DENIED'} "
f"(remaining: {remaining})")
Problem 2: Sliding Window Log Rate Limiter
Fixes the boundary burst problem by tracking individual request timestamps. More accurate but uses more memory.
from collections import deque
class SlidingWindowLogRateLimiter:
"""Sliding window log rate limiter.
Stores timestamp of every request. Counts requests in the
last window_seconds. Most accurate but highest memory usage.
Trade-offs:
- Pro: No boundary burst problem, perfectly accurate
- Con: O(n) memory per client (stores every timestamp)
- Con: O(n) cleanup time (removing old timestamps)
"""
def __init__(self, max_requests: int, window_seconds: int):
self._max_requests = max_requests
self._window_seconds = window_seconds
self._request_logs: Dict[str, deque] = defaultdict(deque)
def allow_request(self, client_id: str) -> bool:
"""Check if a request should be allowed."""
now = time.time()
window_start = now - self._window_seconds
log = self._request_logs[client_id]
# Remove timestamps outside the current window
while log and log[0] <= window_start:
log.popleft()
# Check if under limit
if len(log) < self._max_requests:
log.append(now)
return True
return False
def get_remaining(self, client_id: str) -> int:
"""Return remaining requests in current sliding window."""
now = time.time()
window_start = now - self._window_seconds
log = self._request_logs[client_id]
# Count requests in window
count = sum(1 for ts in log if ts > window_start)
return max(0, self._max_requests - count)
def get_retry_after(self, client_id: str) -> float:
"""Return seconds until the oldest request expires from the window."""
if not self._request_logs[client_id]:
return 0.0
oldest = self._request_logs[client_id][0]
retry_after = oldest + self._window_seconds - time.time()
return max(0.0, retry_after)
# ---- Usage ----
limiter = SlidingWindowLogRateLimiter(max_requests=3, window_seconds=10)
for i in range(5):
allowed = limiter.allow_request("user_1")
print(f"Request {i+1}: {'ALLOWED' if allowed else 'DENIED'} "
f"(remaining: {limiter.get_remaining('user_1')})")
print(f"Retry after: {limiter.get_retry_after('user_1'):.1f}s")
Problem 3: Token Bucket Rate Limiter
The industry standard. A bucket holds tokens that refill at a constant rate. Each request consumes one token. This naturally allows bursts while maintaining an average rate.
class TokenBucketRateLimiter:
"""Token bucket rate limiter.
A bucket holds up to max_tokens tokens. Tokens are added at
refill_rate per second. Each request consumes one token.
Trade-offs:
- Pro: Allows controlled bursts (up to bucket capacity)
- Pro: Smooth rate limiting over time
- Pro: O(1) time and space per client
- Con: More complex to implement correctly
- Con: Burst size is configurable but can be confusing
Used by: AWS API Gateway, Stripe, most production systems
"""
def __init__(self, max_tokens: int, refill_rate: float):
"""
Args:
max_tokens: Maximum tokens in the bucket (burst capacity)
refill_rate: Tokens added per second (sustained rate)
"""
self._max_tokens = max_tokens
self._refill_rate = refill_rate
self._tokens: Dict[str, float] = {}
self._last_refill: Dict[str, float] = {}
def _refill(self, client_id: str):
"""Add tokens based on elapsed time since last refill."""
now = time.time()
if client_id not in self._tokens:
self._tokens[client_id] = self._max_tokens
self._last_refill[client_id] = now
return
elapsed = now - self._last_refill[client_id]
new_tokens = elapsed * self._refill_rate
self._tokens[client_id] = min(
self._max_tokens,
self._tokens[client_id] + new_tokens
)
self._last_refill[client_id] = now
def allow_request(self, client_id: str, tokens: int = 1) -> bool:
"""Check if a request should be allowed.
Args:
client_id: Unique client identifier
tokens: Number of tokens to consume (default 1)
"""
self._refill(client_id)
if self._tokens[client_id] >= tokens:
self._tokens[client_id] -= tokens
return True
return False
def get_remaining(self, client_id: str) -> int:
"""Return available tokens."""
self._refill(client_id)
return int(self._tokens.get(client_id, self._max_tokens))
def get_retry_after(self, client_id: str, tokens: int = 1) -> float:
"""Return seconds until enough tokens are available."""
self._refill(client_id)
current = self._tokens.get(client_id, self._max_tokens)
if current >= tokens:
return 0.0
needed = tokens - current
return needed / self._refill_rate
# ---- Usage ----
# 10 requests/second sustained, burst of 20
limiter = TokenBucketRateLimiter(max_tokens=20, refill_rate=10.0)
# Burst: first 20 requests go through instantly
for i in range(25):
allowed = limiter.allow_request("user_1")
if not allowed:
retry = limiter.get_retry_after("user_1")
print(f"Request {i+1}: DENIED (retry in {retry:.2f}s)")
break
print(f"Request {i+1}: ALLOWED (remaining: {limiter.get_remaining('user_1')})")
Problem 4: Leaky Bucket Rate Limiter
Requests enter a FIFO queue (the bucket) and are processed at a fixed rate. Unlike token bucket, this enforces a perfectly smooth output rate with no bursts.
from collections import deque
from dataclasses import dataclass
@dataclass
class QueuedRequest:
client_id: str
timestamp: float
data: Any = None
class LeakyBucketRateLimiter:
"""Leaky bucket rate limiter.
Requests are queued in a FIFO bucket. The bucket "leaks"
(processes) requests at a fixed rate. If the bucket is full,
new requests are dropped.
Trade-offs:
- Pro: Perfectly smooth output rate (no bursts)
- Pro: Fair queuing across clients
- Con: Added latency (requests wait in queue)
- Con: No burst allowance (unlike token bucket)
Used by: Network traffic shaping, API gateways that need
smooth output rates
"""
def __init__(self, bucket_size: int, leak_rate: float):
"""
Args:
bucket_size: Maximum number of queued requests
leak_rate: Requests processed per second
"""
self._bucket_size = bucket_size
self._leak_rate = leak_rate
self._queue: deque = deque()
self._last_leak_time: float = time.time()
def _leak(self):
"""Process (remove) requests from the queue based on elapsed time."""
now = time.time()
elapsed = now - self._last_leak_time
# Number of requests that should have leaked out
leaked = int(elapsed * self._leak_rate)
if leaked > 0:
# Remove leaked requests from front of queue
for _ in range(min(leaked, len(self._queue))):
self._queue.popleft()
self._last_leak_time = now
def allow_request(self, client_id: str, data: Any = None) -> bool:
"""Try to add a request to the bucket.
Returns True if the request was queued (accepted).
Returns False if the bucket is full (rejected).
"""
self._leak()
if len(self._queue) < self._bucket_size:
self._queue.append(QueuedRequest(
client_id=client_id,
timestamp=time.time(),
data=data,
))
return True
return False
def get_queue_size(self) -> int:
"""Return current number of queued requests."""
self._leak()
return len(self._queue)
def get_wait_time(self) -> float:
"""Return estimated wait time for a new request."""
self._leak()
if len(self._queue) == 0:
return 0.0
return len(self._queue) / self._leak_rate
# ---- Usage ----
limiter = LeakyBucketRateLimiter(bucket_size=5, leak_rate=2.0)
for i in range(8):
allowed = limiter.allow_request(f"user_{i % 3}")
print(f"Request {i+1}: {'QUEUED' if allowed else 'DROPPED'} "
f"(queue: {limiter.get_queue_size()}, "
f"wait: {limiter.get_wait_time():.1f}s)")
Comparison Table
| Algorithm | Memory | Burst | Accuracy | Best For |
|---|---|---|---|---|
| Fixed Window | O(1) | 2x at boundary | Low | Simple use cases, low traffic |
| Sliding Window | O(n) | None | High | Accuracy-critical, low-volume APIs |
| Token Bucket | O(1) | Controlled | High | Production APIs (AWS, Stripe) |
| Leaky Bucket | O(n) | None | High | Traffic shaping, smooth output |
Key Takeaways
- Fixed window is simplest but has the 2x boundary burst problem
- Sliding window log is most accurate but uses O(n) memory per client
- Token bucket is the production standard: O(1) operations, controlled bursts
- Leaky bucket ensures smooth output rate but adds latency
- Always implement get_remaining() and get_retry_after() for proper 429 responses
- In distributed systems, use Redis + Lua scripts for atomic rate limiting
Lilly Tech Systems