Mock Exam 6: ML-Focused
ML engineering interview problems that combine algorithm implementation with system design thinking. These problems test whether you can build ML components from scratch and design production-grade systems. Target time: 75 minutes total.
Problem 1: 25 min | Problem 2: 25 min | Problem 3: 25 min
Start your timer before reading the problems. Do not look at solutions until time is up.
Problem 1: Implement KNN from Scratch
Implement a K-Nearest Neighbors classifier from scratch. Your class should support fit(X, y) to store training data and predict(X) to classify new points. Use Euclidean distance and majority voting.
Requirements
fit(X, y): Store training data. X is a list of feature vectors, y is a list of labels.predict(X): For each point in X, find the k nearest training points and return the majority label.- Handle ties by choosing the label of the closest neighbor among tied classes.
- Do not use scikit-learn or any ML libraries.
Example
X_train = [[1, 2], [2, 3], [3, 1], [6, 5], [7, 7], [8, 6]]
y_train = [0, 0, 0, 1, 1, 1]
knn = KNN(k=3)
knn.fit(X_train, y_train)
print(knn.predict([[2, 2]])) # [0] - closer to class 0 points
print(knn.predict([[7, 6]])) # [1] - closer to class 1 points
print(knn.predict([[5, 4]])) # depends on distances
Solution
import math
from collections import Counter
class KNN:
"""K-Nearest Neighbors classifier from scratch.
Time complexity:
- fit: O(1) - just stores data
- predict: O(n * m * d) where n = test points,
m = training points, d = dimensions
Plus O(m log m) for sorting per test point
Space: O(m * d) for storing training data
"""
def __init__(self, k=3):
self.k = k
self.X_train = None
self.y_train = None
def fit(self, X, y):
"""Store training data.
No actual training happens - KNN is a lazy learner.
"""
self.X_train = X
self.y_train = y
def _euclidean_distance(self, a, b):
"""Compute Euclidean distance between two points.
distance = sqrt(sum((a_i - b_i)^2))
"""
return math.sqrt(sum((ai - bi) ** 2 for ai, bi in zip(a, b)))
def _predict_single(self, x):
"""Predict label for a single point.
1. Compute distance to all training points
2. Sort by distance
3. Take k nearest neighbors
4. Return majority vote (break ties by closest)
"""
# Step 1: Compute all distances
distances = []
for i, x_train in enumerate(self.X_train):
dist = self._euclidean_distance(x, x_train)
distances.append((dist, self.y_train[i]))
# Step 2: Sort by distance
distances.sort(key=lambda d: d[0])
# Step 3: Get k nearest labels
k_nearest = [label for _, label in distances[:self.k]]
# Step 4: Majority vote
vote_counts = Counter(k_nearest)
max_count = max(vote_counts.values())
candidates = [label for label, count in vote_counts.items()
if count == max_count]
# Break ties: return label of closest neighbor among tied
if len(candidates) == 1:
return candidates[0]
for _, label in distances[:self.k]:
if label in candidates:
return label
def predict(self, X):
"""Predict labels for multiple points."""
return [self._predict_single(x) for x in X]
def score(self, X, y):
"""Compute accuracy on test data."""
predictions = self.predict(X)
correct = sum(p == actual for p, actual in zip(predictions, y))
return correct / len(y)
# Test
X_train = [[1, 2], [2, 3], [3, 1], [6, 5], [7, 7], [8, 6]]
y_train = [0, 0, 0, 1, 1, 1]
knn = KNN(k=3)
knn.fit(X_train, y_train)
print(knn.predict([[2, 2]])) # [0]
print(knn.predict([[7, 6]])) # [1]
print(knn.score(X_train, y_train)) # 1.0 (training accuracy)
Problem 2: Build Feature Pipeline
Implement a feature transformation pipeline that supports chaining multiple transformations. Each transformer implements fit(data) and transform(data). The pipeline should apply transformations in sequence.
Requirements
- Implement
StandardScaler: normalize features to zero mean and unit variance - Implement
MinMaxScaler: scale features to [0, 1] range - Implement
Pipeline: chain multiple transformers, supportfit_transform(data) - Handle edge cases: constant features (zero variance), empty data
Solution
class StandardScaler:
"""Standardize features: zero mean, unit variance.
z = (x - mean) / std
Time: O(n * d) for fit, O(n * d) for transform
Space: O(d) for mean and std arrays
"""
def __init__(self):
self.mean = None
self.std = None
def fit(self, data):
"""Compute mean and std for each feature."""
n = len(data)
d = len(data[0])
# Compute mean
self.mean = [0.0] * d
for row in data:
for j in range(d):
self.mean[j] += row[j]
self.mean = [m / n for m in self.mean]
# Compute std
self.std = [0.0] * d
for row in data:
for j in range(d):
self.std[j] += (row[j] - self.mean[j]) ** 2
self.std = [(s / n) ** 0.5 for s in self.std]
# Handle zero variance (constant features)
self.std = [s if s > 0 else 1.0 for s in self.std]
return self
def transform(self, data):
"""Apply standardization."""
return [
[(row[j] - self.mean[j]) / self.std[j]
for j in range(len(row))]
for row in data
]
def fit_transform(self, data):
return self.fit(data).transform(data)
class MinMaxScaler:
"""Scale features to [0, 1] range.
x_scaled = (x - min) / (max - min)
Time: O(n * d) for fit, O(n * d) for transform
Space: O(d) for min and max arrays
"""
def __init__(self):
self.min_vals = None
self.max_vals = None
def fit(self, data):
"""Compute min and max for each feature."""
d = len(data[0])
self.min_vals = [float('inf')] * d
self.max_vals = [float('-inf')] * d
for row in data:
for j in range(d):
self.min_vals[j] = min(self.min_vals[j], row[j])
self.max_vals[j] = max(self.max_vals[j], row[j])
return self
def transform(self, data):
"""Apply min-max scaling."""
return [
[
(row[j] - self.min_vals[j]) /
(self.max_vals[j] - self.min_vals[j])
if self.max_vals[j] != self.min_vals[j] else 0.0
for j in range(len(row))
]
for row in data
]
def fit_transform(self, data):
return self.fit(data).transform(data)
class Pipeline:
"""Chain multiple transformers.
Applies transformers in sequence: output of
transformer i becomes input of transformer i+1.
Time: O(sum of individual transformer times)
Space: O(n * d) for intermediate results
"""
def __init__(self, steps):
"""steps: list of (name, transformer) tuples."""
self.steps = steps
def fit(self, data):
"""Fit each transformer on the output of the previous."""
current_data = data
for name, transformer in self.steps:
transformer.fit(current_data)
current_data = transformer.transform(current_data)
return self
def transform(self, data):
"""Apply all transformers in sequence."""
current_data = data
for name, transformer in self.steps:
current_data = transformer.transform(current_data)
return current_data
def fit_transform(self, data):
"""Fit and transform in one pass."""
current_data = data
for name, transformer in self.steps:
current_data = transformer.fit_transform(current_data)
return current_data
# Test
data = [[1, 200], [2, 300], [3, 400], [4, 500], [5, 600]]
# Individual transformers
scaler = StandardScaler()
result = scaler.fit_transform(data)
print("StandardScaler:", [[round(v, 2) for v in row] for row in result])
# Pipeline
pipe = Pipeline([
("minmax", MinMaxScaler()),
("standard", StandardScaler())
])
result = pipe.fit_transform(data)
print("Pipeline:", [[round(v, 2) for v in row] for row in result])
Problem 3: Design Rate Limiter
Design and implement a rate limiter that allows at most max_requests requests per window_seconds for each user. Implement both a sliding window and a token bucket approach.
Requirements
is_allowed(user_id): Return True if the request is allowed, False if rate-limited- Support per-user rate limiting
- Thread-safe is a bonus but not required
- Clean up expired data to prevent memory leaks
Solution: Sliding Window Log
import time
from collections import defaultdict, deque
class SlidingWindowRateLimiter:
"""Rate limiter using sliding window log.
Stores timestamp of each request in a deque.
On each request, remove expired timestamps,
then check if count exceeds limit.
Time: O(n) worst case for cleanup, amortized O(1)
Space: O(max_requests) per user
"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.user_requests = defaultdict(deque) # user -> deque of timestamps
def is_allowed(self, user_id):
"""Check if request is allowed for this user."""
now = time.time()
window_start = now - self.window_seconds
requests = self.user_requests[user_id]
# Remove expired timestamps
while requests and requests[0] <= window_start:
requests.popleft()
# Check if under limit
if len(requests) < self.max_requests:
requests.append(now)
return True
return False
def get_remaining(self, user_id):
"""Get remaining requests in current window."""
now = time.time()
window_start = now - self.window_seconds
requests = self.user_requests[user_id]
while requests and requests[0] <= window_start:
requests.popleft()
return max(0, self.max_requests - len(requests))
class TokenBucketRateLimiter:
"""Rate limiter using token bucket algorithm.
Each user has a bucket that fills with tokens
at a steady rate. Each request consumes one token.
If the bucket is empty, the request is denied.
Advantages over sliding window:
- Allows short bursts (up to bucket capacity)
- O(1) time and space per check
- No need to store individual timestamps
Time: O(1) per request
Space: O(1) per user
"""
def __init__(self, max_tokens, refill_rate):
"""
max_tokens: bucket capacity
refill_rate: tokens added per second
"""
self.max_tokens = max_tokens
self.refill_rate = refill_rate
self.buckets = {} # user -> (tokens, last_refill_time)
def is_allowed(self, user_id):
"""Check if request is allowed, consume a token."""
now = time.time()
if user_id not in self.buckets:
# New user: start with full bucket
self.buckets[user_id] = (self.max_tokens - 1, now)
return True
tokens, last_refill = self.buckets[user_id]
# Add tokens based on elapsed time
elapsed = now - last_refill
tokens = min(self.max_tokens, tokens + elapsed * self.refill_rate)
if tokens >= 1:
self.buckets[user_id] = (tokens - 1, now)
return True
self.buckets[user_id] = (tokens, now)
return False
def get_remaining(self, user_id):
"""Get remaining tokens for this user."""
if user_id not in self.buckets:
return self.max_tokens
tokens, last_refill = self.buckets[user_id]
elapsed = time.time() - last_refill
tokens = min(self.max_tokens, tokens + elapsed * self.refill_rate)
return int(tokens)
# Test sliding window
print("--- Sliding Window ---")
limiter = SlidingWindowRateLimiter(max_requests=3, window_seconds=10)
for i in range(5):
result = limiter.is_allowed("user1")
print(f"Request {i+1}: {'Allowed' if result else 'Denied'}")
# Expected: Allowed, Allowed, Allowed, Denied, Denied
# Test token bucket
print("\n--- Token Bucket ---")
bucket = TokenBucketRateLimiter(max_tokens=3, refill_rate=0.5)
for i in range(5):
result = bucket.is_allowed("user1")
print(f"Request {i+1}: {'Allowed' if result else 'Denied'}")
Comparison: Sliding Window vs Token Bucket
| Aspect | Sliding Window | Token Bucket |
|---|---|---|
| Memory | O(max_requests) per user | O(1) per user |
| Burst handling | Strict limit | Allows bursts up to capacity |
| Accuracy | Exact count | Approximate (smooth refill) |
| Use case | API rate limiting | Network traffic shaping |
| Complexity | Amortized O(1) | O(1) |
Score Your Performance
| Result | Score | Next Step |
|---|---|---|
| All 3 implemented correctly in < 75 min | ⭐ Exceptional | You are ready for ML engineering interviews |
| 2 implemented + partial on third in < 75 min | 👍 Strong | Review the missed problem, move to Review lesson |
| 1 fully implemented | 🔄 Developing | Practice more OOP and system design coding |
| Partial implementations only | 📚 Keep Practicing | Focus on clean class design, retry in 5 days |
Lilly Tech Systems