Advanced

Mock Exam 6: ML-Focused

ML engineering interview problems that combine algorithm implementation with system design thinking. These problems test whether you can build ML components from scratch and design production-grade systems. Target time: 75 minutes total.

Time Target: 75 Minutes
Problem 1: 25 min | Problem 2: 25 min | Problem 3: 25 min
Start your timer before reading the problems. Do not look at solutions until time is up.

Problem 1: Implement KNN from Scratch

Implement a K-Nearest Neighbors classifier from scratch. Your class should support fit(X, y) to store training data and predict(X) to classify new points. Use Euclidean distance and majority voting.

Requirements

  • fit(X, y): Store training data. X is a list of feature vectors, y is a list of labels.
  • predict(X): For each point in X, find the k nearest training points and return the majority label.
  • Handle ties by choosing the label of the closest neighbor among tied classes.
  • Do not use scikit-learn or any ML libraries.

Example

X_train = [[1, 2], [2, 3], [3, 1], [6, 5], [7, 7], [8, 6]]
y_train = [0, 0, 0, 1, 1, 1]

knn = KNN(k=3)
knn.fit(X_train, y_train)

print(knn.predict([[2, 2]]))  # [0] - closer to class 0 points
print(knn.predict([[7, 6]]))  # [1] - closer to class 1 points
print(knn.predict([[5, 4]]))  # depends on distances

Solution

import math
from collections import Counter

class KNN:
    """K-Nearest Neighbors classifier from scratch.

    Time complexity:
      - fit: O(1) - just stores data
      - predict: O(n * m * d) where n = test points,
        m = training points, d = dimensions
        Plus O(m log m) for sorting per test point

    Space: O(m * d) for storing training data
    """
    def __init__(self, k=3):
        self.k = k
        self.X_train = None
        self.y_train = None

    def fit(self, X, y):
        """Store training data.

        No actual training happens - KNN is a lazy learner.
        """
        self.X_train = X
        self.y_train = y

    def _euclidean_distance(self, a, b):
        """Compute Euclidean distance between two points.

        distance = sqrt(sum((a_i - b_i)^2))
        """
        return math.sqrt(sum((ai - bi) ** 2 for ai, bi in zip(a, b)))

    def _predict_single(self, x):
        """Predict label for a single point.

        1. Compute distance to all training points
        2. Sort by distance
        3. Take k nearest neighbors
        4. Return majority vote (break ties by closest)
        """
        # Step 1: Compute all distances
        distances = []
        for i, x_train in enumerate(self.X_train):
            dist = self._euclidean_distance(x, x_train)
            distances.append((dist, self.y_train[i]))

        # Step 2: Sort by distance
        distances.sort(key=lambda d: d[0])

        # Step 3: Get k nearest labels
        k_nearest = [label for _, label in distances[:self.k]]

        # Step 4: Majority vote
        vote_counts = Counter(k_nearest)
        max_count = max(vote_counts.values())
        candidates = [label for label, count in vote_counts.items()
                      if count == max_count]

        # Break ties: return label of closest neighbor among tied
        if len(candidates) == 1:
            return candidates[0]

        for _, label in distances[:self.k]:
            if label in candidates:
                return label

    def predict(self, X):
        """Predict labels for multiple points."""
        return [self._predict_single(x) for x in X]

    def score(self, X, y):
        """Compute accuracy on test data."""
        predictions = self.predict(X)
        correct = sum(p == actual for p, actual in zip(predictions, y))
        return correct / len(y)

# Test
X_train = [[1, 2], [2, 3], [3, 1], [6, 5], [7, 7], [8, 6]]
y_train = [0, 0, 0, 1, 1, 1]

knn = KNN(k=3)
knn.fit(X_train, y_train)
print(knn.predict([[2, 2]]))   # [0]
print(knn.predict([[7, 6]]))   # [1]
print(knn.score(X_train, y_train))  # 1.0 (training accuracy)
💡
Interview Tip: Interviewers expect you to discuss KNN's limitations: O(n) prediction time (no precomputation), curse of dimensionality, sensitivity to feature scaling. Mention that production KNN uses KD-trees or ball trees for O(log n) nearest neighbor search.

Problem 2: Build Feature Pipeline

Implement a feature transformation pipeline that supports chaining multiple transformations. Each transformer implements fit(data) and transform(data). The pipeline should apply transformations in sequence.

Requirements

  • Implement StandardScaler: normalize features to zero mean and unit variance
  • Implement MinMaxScaler: scale features to [0, 1] range
  • Implement Pipeline: chain multiple transformers, support fit_transform(data)
  • Handle edge cases: constant features (zero variance), empty data

Solution

class StandardScaler:
    """Standardize features: zero mean, unit variance.

    z = (x - mean) / std

    Time: O(n * d) for fit, O(n * d) for transform
    Space: O(d) for mean and std arrays
    """
    def __init__(self):
        self.mean = None
        self.std = None

    def fit(self, data):
        """Compute mean and std for each feature."""
        n = len(data)
        d = len(data[0])

        # Compute mean
        self.mean = [0.0] * d
        for row in data:
            for j in range(d):
                self.mean[j] += row[j]
        self.mean = [m / n for m in self.mean]

        # Compute std
        self.std = [0.0] * d
        for row in data:
            for j in range(d):
                self.std[j] += (row[j] - self.mean[j]) ** 2
        self.std = [(s / n) ** 0.5 for s in self.std]

        # Handle zero variance (constant features)
        self.std = [s if s > 0 else 1.0 for s in self.std]
        return self

    def transform(self, data):
        """Apply standardization."""
        return [
            [(row[j] - self.mean[j]) / self.std[j]
             for j in range(len(row))]
            for row in data
        ]

    def fit_transform(self, data):
        return self.fit(data).transform(data)


class MinMaxScaler:
    """Scale features to [0, 1] range.

    x_scaled = (x - min) / (max - min)

    Time: O(n * d) for fit, O(n * d) for transform
    Space: O(d) for min and max arrays
    """
    def __init__(self):
        self.min_vals = None
        self.max_vals = None

    def fit(self, data):
        """Compute min and max for each feature."""
        d = len(data[0])
        self.min_vals = [float('inf')] * d
        self.max_vals = [float('-inf')] * d

        for row in data:
            for j in range(d):
                self.min_vals[j] = min(self.min_vals[j], row[j])
                self.max_vals[j] = max(self.max_vals[j], row[j])

        return self

    def transform(self, data):
        """Apply min-max scaling."""
        return [
            [
                (row[j] - self.min_vals[j]) /
                (self.max_vals[j] - self.min_vals[j])
                if self.max_vals[j] != self.min_vals[j] else 0.0
                for j in range(len(row))
            ]
            for row in data
        ]

    def fit_transform(self, data):
        return self.fit(data).transform(data)


class Pipeline:
    """Chain multiple transformers.

    Applies transformers in sequence: output of
    transformer i becomes input of transformer i+1.

    Time: O(sum of individual transformer times)
    Space: O(n * d) for intermediate results
    """
    def __init__(self, steps):
        """steps: list of (name, transformer) tuples."""
        self.steps = steps

    def fit(self, data):
        """Fit each transformer on the output of the previous."""
        current_data = data
        for name, transformer in self.steps:
            transformer.fit(current_data)
            current_data = transformer.transform(current_data)
        return self

    def transform(self, data):
        """Apply all transformers in sequence."""
        current_data = data
        for name, transformer in self.steps:
            current_data = transformer.transform(current_data)
        return current_data

    def fit_transform(self, data):
        """Fit and transform in one pass."""
        current_data = data
        for name, transformer in self.steps:
            current_data = transformer.fit_transform(current_data)
        return current_data

# Test
data = [[1, 200], [2, 300], [3, 400], [4, 500], [5, 600]]

# Individual transformers
scaler = StandardScaler()
result = scaler.fit_transform(data)
print("StandardScaler:", [[round(v, 2) for v in row] for row in result])

# Pipeline
pipe = Pipeline([
    ("minmax", MinMaxScaler()),
    ("standard", StandardScaler())
])
result = pipe.fit_transform(data)
print("Pipeline:", [[round(v, 2) for v in row] for row in result])

Problem 3: Design Rate Limiter

Design and implement a rate limiter that allows at most max_requests requests per window_seconds for each user. Implement both a sliding window and a token bucket approach.

Requirements

  • is_allowed(user_id): Return True if the request is allowed, False if rate-limited
  • Support per-user rate limiting
  • Thread-safe is a bonus but not required
  • Clean up expired data to prevent memory leaks

Solution: Sliding Window Log

import time
from collections import defaultdict, deque

class SlidingWindowRateLimiter:
    """Rate limiter using sliding window log.

    Stores timestamp of each request in a deque.
    On each request, remove expired timestamps,
    then check if count exceeds limit.

    Time: O(n) worst case for cleanup, amortized O(1)
    Space: O(max_requests) per user
    """
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.user_requests = defaultdict(deque)  # user -> deque of timestamps

    def is_allowed(self, user_id):
        """Check if request is allowed for this user."""
        now = time.time()
        window_start = now - self.window_seconds

        requests = self.user_requests[user_id]

        # Remove expired timestamps
        while requests and requests[0] <= window_start:
            requests.popleft()

        # Check if under limit
        if len(requests) < self.max_requests:
            requests.append(now)
            return True

        return False

    def get_remaining(self, user_id):
        """Get remaining requests in current window."""
        now = time.time()
        window_start = now - self.window_seconds
        requests = self.user_requests[user_id]

        while requests and requests[0] <= window_start:
            requests.popleft()

        return max(0, self.max_requests - len(requests))


class TokenBucketRateLimiter:
    """Rate limiter using token bucket algorithm.

    Each user has a bucket that fills with tokens
    at a steady rate. Each request consumes one token.
    If the bucket is empty, the request is denied.

    Advantages over sliding window:
    - Allows short bursts (up to bucket capacity)
    - O(1) time and space per check
    - No need to store individual timestamps

    Time: O(1) per request
    Space: O(1) per user
    """
    def __init__(self, max_tokens, refill_rate):
        """
        max_tokens: bucket capacity
        refill_rate: tokens added per second
        """
        self.max_tokens = max_tokens
        self.refill_rate = refill_rate
        self.buckets = {}  # user -> (tokens, last_refill_time)

    def is_allowed(self, user_id):
        """Check if request is allowed, consume a token."""
        now = time.time()

        if user_id not in self.buckets:
            # New user: start with full bucket
            self.buckets[user_id] = (self.max_tokens - 1, now)
            return True

        tokens, last_refill = self.buckets[user_id]

        # Add tokens based on elapsed time
        elapsed = now - last_refill
        tokens = min(self.max_tokens, tokens + elapsed * self.refill_rate)

        if tokens >= 1:
            self.buckets[user_id] = (tokens - 1, now)
            return True

        self.buckets[user_id] = (tokens, now)
        return False

    def get_remaining(self, user_id):
        """Get remaining tokens for this user."""
        if user_id not in self.buckets:
            return self.max_tokens

        tokens, last_refill = self.buckets[user_id]
        elapsed = time.time() - last_refill
        tokens = min(self.max_tokens, tokens + elapsed * self.refill_rate)
        return int(tokens)


# Test sliding window
print("--- Sliding Window ---")
limiter = SlidingWindowRateLimiter(max_requests=3, window_seconds=10)
for i in range(5):
    result = limiter.is_allowed("user1")
    print(f"Request {i+1}: {'Allowed' if result else 'Denied'}")
    # Expected: Allowed, Allowed, Allowed, Denied, Denied

# Test token bucket
print("\n--- Token Bucket ---")
bucket = TokenBucketRateLimiter(max_tokens=3, refill_rate=0.5)
for i in range(5):
    result = bucket.is_allowed("user1")
    print(f"Request {i+1}: {'Allowed' if result else 'Denied'}")
Exam Debrief: This ML-focused exam tests algorithm implementation (KNN), software engineering patterns (Pipeline), and system design coding (Rate Limiter). In ML interviews, you are expected to implement algorithms from scratch, design clean abstractions, and discuss trade-offs between different approaches.

Comparison: Sliding Window vs Token Bucket

AspectSliding WindowToken Bucket
MemoryO(max_requests) per userO(1) per user
Burst handlingStrict limitAllows bursts up to capacity
AccuracyExact countApproximate (smooth refill)
Use caseAPI rate limitingNetwork traffic shaping
ComplexityAmortized O(1)O(1)

Score Your Performance

ResultScoreNext Step
All 3 implemented correctly in < 75 min⭐ ExceptionalYou are ready for ML engineering interviews
2 implemented + partial on third in < 75 min👍 StrongReview the missed problem, move to Review lesson
1 fully implemented🔄 DevelopingPractice more OOP and system design coding
Partial implementations only📚 Keep PracticingFocus on clean class design, retry in 5 days