Advanced

Bayesian Methods in Code

Five problems that implement Bayesian inference from scratch. You will build a general-purpose Bayesian inference engine, conjugate models, a Naive Bayes classifier, Bayesian A/B testing, and Thompson sampling for multi-armed bandits. These are asked at quant firms, tech companies, and anywhere decisions are made under uncertainty.

Problem 1: Bayesian Inference Engine

Problem: Implement a discrete Bayesian inference engine that updates a prior distribution with observed evidence using Bayes' theorem. This is the core computation that all Bayesian methods build upon.

import math

# ---- CHALLENGE ----
def bayesian_update(prior, likelihood, evidence):
    """Update prior with evidence using Bayes' theorem.
    prior: dict mapping hypothesis -> P(hypothesis)
    likelihood: dict mapping hypothesis -> P(evidence | hypothesis)
    Return: posterior dict mapping hypothesis -> P(hypothesis | evidence)
    """
    pass

# ---- SOLUTION ----
def bayesian_update(prior, likelihood):
    """Discrete Bayesian inference.

    Bayes' Theorem: P(H|E) = P(E|H) * P(H) / P(E)

    where P(E) = sum over all hypotheses H_i of P(E|H_i) * P(H_i)

    Steps:
    1. Compute unnormalized posterior: P(E|H) * P(H) for each H
    2. Compute evidence (normalizing constant): sum of unnormalized posteriors
    3. Normalize: P(H|E) = unnormalized / evidence
    """
    # Step 1: Unnormalized posterior
    unnormalized = {}
    for hypothesis in prior:
        unnormalized[hypothesis] = prior[hypothesis] * likelihood[hypothesis]

    # Step 2: Evidence (normalizing constant)
    evidence = sum(unnormalized.values())

    if evidence == 0:
        raise ValueError("Evidence is zero - impossible observation given the model")

    # Step 3: Normalize
    posterior = {h: p / evidence for h, p in unnormalized.items()}
    return posterior

def sequential_bayesian_update(prior, observations, likelihood_func):
    """Apply Bayes' theorem sequentially for multiple observations.

    The posterior after each observation becomes the prior for the next.
    This is the power of Bayesian updating: O(n) computation, no need to
    reprocess all data.
    """
    current_prior = dict(prior)
    for obs in observations:
        likelihood = {h: likelihood_func(h, obs) for h in current_prior}
        current_prior = bayesian_update(current_prior, likelihood)
    return current_prior

# ---- VERIFICATION ----
# Classic example: disease testing
# P(disease) = 0.01, P(no disease) = 0.99
# Test sensitivity: P(positive | disease) = 0.95
# Test specificity: P(negative | no disease) = 0.90 -> P(positive | no disease) = 0.10

prior = {"disease": 0.01, "healthy": 0.99}
likelihood_positive = {"disease": 0.95, "healthy": 0.10}

posterior = bayesian_update(prior, likelihood_positive)

# P(disease | positive test) should be ~8.8% (NOT 95%!)
assert abs(posterior["disease"] - 0.0876) < 0.001
assert abs(posterior["healthy"] - 0.9124) < 0.001
assert abs(sum(posterior.values()) - 1.0) < 1e-10

# Sequential: two positive tests
def test_likelihood(hypothesis, result):
    if result == "positive":
        return 0.95 if hypothesis == "disease" else 0.10
    else:
        return 0.05 if hypothesis == "disease" else 0.90

posterior_2 = sequential_bayesian_update(prior, ["positive", "positive"], test_likelihood)
# After two positive tests, P(disease) should be much higher
assert posterior_2["disease"] > 0.45, f"Should be high: {posterior_2['disease']}"
print("All tests passed!")
💡
The base rate fallacy: Even with a 95% accurate test, if only 1% of people have the disease, a positive test only means ~8.8% chance of disease. This is the most famous Bayesian result, and interviewers love to ask it because most candidates get it wrong.

Problem 2: Beta-Binomial Conjugate Model

Problem: Implement the Beta-Binomial conjugate model from scratch. The Beta distribution is the conjugate prior for the Binomial likelihood, meaning the posterior is also a Beta distribution with simple update rules. Implement the Beta PDF, posterior update, and credible intervals.

import math
import random

# ---- CHALLENGE ----
class BetaBinomial:
    """Beta-Binomial conjugate model for learning probabilities from data."""
    def __init__(self, alpha=1, beta=1):
        pass

    def update(self, successes, failures):
        pass

    def mean(self):
        pass

    def pdf(self, x):
        pass

# ---- SOLUTION ----
def _log_beta_function(a, b):
    """Log of Beta function: B(a,b) = Gamma(a)*Gamma(b)/Gamma(a+b).
    Using Stirling's approximation for log-gamma:
    log(Gamma(x)) ~ (x-0.5)*log(x) - x + 0.5*log(2*pi) + 1/(12*x)
    """
    def _log_gamma(x):
        if x <= 0:
            raise ValueError("x must be positive")
        if x < 7:
            # Shift to larger value for better approximation
            shift = 7 - int(x)
            log_correction = sum(math.log(x + i) for i in range(shift))
            x = x + shift
            result = (x - 0.5) * math.log(x) - x + 0.5 * math.log(2 * math.pi) + 1.0 / (12 * x)
            return result - log_correction
        return (x - 0.5) * math.log(x) - x + 0.5 * math.log(2 * math.pi) + 1.0 / (12 * x)

    return _log_gamma(a) + _log_gamma(b) - _log_gamma(a + b)

class BetaBinomial:
    """Beta-Binomial conjugate model.

    Prior: p ~ Beta(alpha, beta)
    Likelihood: X | p ~ Binomial(n, p)
    Posterior: p | X ~ Beta(alpha + successes, beta + failures)

    The conjugate property means:
    - Prior is Beta(a, b)
    - After observing k successes in n trials
    - Posterior is Beta(a + k, b + n - k)

    This is the simplest and most important Bayesian model.
    """
    def __init__(self, alpha=1.0, beta=1.0):
        """Initialize with prior Beta(alpha, beta).
        Default alpha=beta=1 is the uniform (non-informative) prior.
        """
        self.alpha = alpha
        self.beta = beta

    def update(self, successes, failures):
        """Update posterior with observed data.
        Posterior: Beta(alpha + successes, beta + failures)
        """
        self.alpha += successes
        self.beta += failures

    def mean(self):
        """Posterior mean: E[p] = alpha / (alpha + beta)"""
        return self.alpha / (self.alpha + self.beta)

    def mode(self):
        """Posterior mode (MAP estimate): (alpha-1) / (alpha+beta-2)
        Only defined when alpha > 1 and beta > 1.
        """
        if self.alpha > 1 and self.beta > 1:
            return (self.alpha - 1) / (self.alpha + self.beta - 2)
        return self.mean()  # fallback

    def variance(self):
        """Posterior variance: alpha*beta / ((alpha+beta)^2 * (alpha+beta+1))"""
        ab = self.alpha + self.beta
        return (self.alpha * self.beta) / (ab ** 2 * (ab + 1))

    def pdf(self, x):
        """Beta PDF: f(x) = x^(a-1) * (1-x)^(b-1) / B(a,b)
        Computed in log-space for numerical stability.
        """
        if x <= 0 or x >= 1:
            return 0.0
        log_pdf = ((self.alpha - 1) * math.log(x) +
                   (self.beta - 1) * math.log(1 - x) -
                   _log_beta_function(self.alpha, self.beta))
        return math.exp(log_pdf)

    def credible_interval(self, confidence=0.95, num_points=1000):
        """Compute credible interval using grid approximation.
        Find the narrowest interval containing `confidence` probability mass.
        """
        alpha_level = 1 - confidence
        # Grid approximation of the CDF
        step = 1.0 / num_points
        grid = [(i + 0.5) * step for i in range(num_points)]
        densities = [self.pdf(x) * step for x in grid]

        # Find percentile-based interval
        cumsum = 0
        lower = 0
        for i, d in enumerate(densities):
            cumsum += d
            if cumsum >= alpha_level / 2:
                lower = grid[i]
                break

        cumsum = 0
        upper = 1
        for i in range(len(densities) - 1, -1, -1):
            cumsum += densities[i]
            if cumsum >= alpha_level / 2:
                upper = grid[i]
                break

        return lower, upper

    def sample(self, n=1):
        """Sample from Beta distribution using the ratio-of-uniforms method."""
        samples = []
        for _ in range(n):
            # Use gamma sampling: if X~Gamma(a,1) and Y~Gamma(b,1),
            # then X/(X+Y) ~ Beta(a,b)
            x = _sample_gamma(self.alpha)
            y = _sample_gamma(self.beta)
            samples.append(x / (x + y))
        return samples if n > 1 else samples[0]

def _sample_gamma(shape):
    """Sample from Gamma(shape, 1) using Marsaglia and Tsang's method."""
    if shape < 1:
        # Gamma(shape) = Gamma(shape+1) * U^(1/shape)
        return _sample_gamma(shape + 1) * (random.random() ** (1.0 / shape))

    d = shape - 1.0 / 3.0
    c = 1.0 / math.sqrt(9.0 * d)
    while True:
        while True:
            # Generate normal sample
            u1 = random.random()
            u2 = random.random()
            while u1 == 0:
                u1 = random.random()
            x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
            v = (1 + c * x) ** 3
            if v > 0:
                break
        u = random.random()
        if u < 1 - 0.0331 * x ** 4 or math.log(u) < 0.5 * x ** 2 + d * (1 - v + math.log(v)):
            return d * v

# ---- VERIFICATION ----
random.seed(42)

# Start with uniform prior, observe 7 heads out of 10 coin flips
model = BetaBinomial(alpha=1, beta=1)
model.update(successes=7, failures=3)

assert abs(model.mean() - 8/12) < 1e-10  # (1+7)/(1+7+1+3) = 8/12
assert model.alpha == 8 and model.beta == 4

# More data: 70 heads out of 100 total
model2 = BetaBinomial(alpha=1, beta=1)
model2.update(successes=70, failures=30)
# With lots of data, posterior mean should be close to MLE (0.70)
assert abs(model2.mean() - 0.70) < 0.01

# Credible interval should contain the true value
lower, upper = model2.credible_interval(0.95)
assert lower < 0.70 < upper

# Sampling
samples = model2.sample(n=5000)
sample_mean = sum(samples) / len(samples)
assert abs(sample_mean - model2.mean()) < 0.02
print("All tests passed!")

Problem 3: Naive Bayes Classifier from Scratch

Problem: Implement a Gaussian Naive Bayes classifier from scratch. Train it on continuous features, predict class labels, and compute class probabilities. This is asked at every data science interview.

import math

# ---- CHALLENGE ----
class NaiveBayes:
    """Gaussian Naive Bayes classifier from scratch."""
    def fit(self, X, y):
        pass
    def predict(self, X):
        pass
    def predict_proba(self, X):
        pass

# ---- SOLUTION ----
class NaiveBayes:
    """Gaussian Naive Bayes classifier.

    Assumption: features are conditionally independent given the class.
    P(class | x1, x2, ..., xd) proportional to P(class) * prod(P(xi | class))

    For continuous features, P(xi | class) is modeled as Gaussian:
    P(xi | class=c) = N(xi; mu_c_i, sigma_c_i^2)

    Training: compute mean and variance of each feature for each class.
    Prediction: compute posterior for each class, return argmax.
    """
    def __init__(self):
        self.classes = []
        self.class_prior = {}   # P(class)
        self.means = {}         # mean[class][feature]
        self.variances = {}     # var[class][feature]

    def fit(self, X, y):
        """Train the model. X: list of feature vectors, y: list of labels."""
        n_samples = len(X)
        n_features = len(X[0])

        # Find unique classes
        self.classes = sorted(set(y))

        for c in self.classes:
            # Get samples belonging to class c
            class_samples = [X[i] for i in range(n_samples) if y[i] == c]
            n_c = len(class_samples)

            # Prior probability
            self.class_prior[c] = n_c / n_samples

            # Mean and variance per feature
            self.means[c] = []
            self.variances[c] = []

            for j in range(n_features):
                values = [sample[j] for sample in class_samples]
                mu = sum(values) / n_c
                # Variance with smoothing to avoid zero variance
                var = sum((v - mu) ** 2 for v in values) / n_c
                var = max(var, 1e-9)  # smoothing
                self.means[c].append(mu)
                self.variances[c].append(var)

    def _log_gaussian_pdf(self, x, mu, var):
        """Log of Gaussian PDF for numerical stability."""
        return -0.5 * math.log(2 * math.pi * var) - 0.5 * ((x - mu) ** 2 / var)

    def _log_posterior(self, x):
        """Compute log P(class | x) for all classes (unnormalized)."""
        log_posteriors = {}
        for c in self.classes:
            # Start with log prior
            log_p = math.log(self.class_prior[c])
            # Add log likelihood for each feature (independence assumption)
            for j in range(len(x)):
                log_p += self._log_gaussian_pdf(x[j], self.means[c][j], self.variances[c][j])
            log_posteriors[c] = log_p
        return log_posteriors

    def predict(self, X):
        """Predict class labels for a list of feature vectors."""
        predictions = []
        for x in X:
            log_post = self._log_posterior(x)
            best_class = max(log_post, key=log_post.get)
            predictions.append(best_class)
        return predictions

    def predict_proba(self, X):
        """Predict class probabilities using log-sum-exp for normalization."""
        all_probs = []
        for x in X:
            log_post = self._log_posterior(x)

            # Log-sum-exp normalization
            max_log = max(log_post.values())
            log_sum = max_log + math.log(sum(
                math.exp(lp - max_log) for lp in log_post.values()
            ))

            probs = {c: math.exp(lp - log_sum) for c, lp in log_post.items()}
            all_probs.append(probs)
        return all_probs

# ---- VERIFICATION ----
# Simple 2D dataset: two clusters
X_train = [
    [1.0, 2.0], [1.5, 1.8], [1.2, 2.2], [0.8, 1.9],  # class 0
    [5.0, 8.0], [5.5, 7.5], [5.2, 8.2], [4.8, 7.9]    # class 1
]
y_train = [0, 0, 0, 0, 1, 1, 1, 1]

model = NaiveBayes()
model.fit(X_train, y_train)

# Test prediction
X_test = [[1.0, 2.0], [5.0, 8.0], [3.0, 5.0]]
predictions = model.predict(X_test)
assert predictions[0] == 0, "Should predict class 0"
assert predictions[1] == 1, "Should predict class 1"

# Test probabilities sum to 1
probs = model.predict_proba(X_test)
for p in probs:
    total = sum(p.values())
    assert abs(total - 1.0) < 1e-6, f"Probabilities should sum to 1, got {total}"

# Class 0 should have high probability for [1.0, 2.0]
assert probs[0][0] > 0.99
# Class 1 should have high probability for [5.0, 8.0]
assert probs[1][1] > 0.99
print("All tests passed!")

Problem 4: Bayesian A/B Testing

Problem: Implement Bayesian A/B testing from scratch. Instead of computing a p-value, compute the posterior probability that variant B is better than variant A. Use Beta-Binomial conjugate analysis and Monte Carlo estimation.

import math
import random

# ---- CHALLENGE ----
def bayesian_ab_test(a_successes, a_total, b_successes, b_total, num_samples=100000):
    """Bayesian A/B test: P(B > A), expected lift, risk analysis."""
    pass

# ---- SOLUTION ----
def _sample_beta(alpha, beta_param):
    """Sample from Beta(alpha, beta) using Gamma ratio method."""
    def _gamma_sample(shape):
        if shape < 1:
            return _gamma_sample(shape + 1) * (random.random() ** (1.0 / shape))
        d = shape - 1.0 / 3.0
        c = 1.0 / math.sqrt(9.0 * d)
        while True:
            while True:
                u1 = random.random()
                u2 = random.random()
                while u1 == 0:
                    u1 = random.random()
                x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
                v = (1 + c * x) ** 3
                if v > 0:
                    break
            u = random.random()
            if u < 1 - 0.0331 * x ** 4 or math.log(u) < 0.5 * x ** 2 + d * (1 - v + math.log(v)):
                return d * v

    x = _gamma_sample(alpha)
    y = _gamma_sample(beta_param)
    return x / (x + y)

def bayesian_ab_test(a_successes, a_total, b_successes, b_total, num_samples=100000):
    """Bayesian A/B test using Beta-Binomial conjugate model.

    Model:
    - p_A ~ Beta(1 + a_successes, 1 + a_failures)  (uniform prior)
    - p_B ~ Beta(1 + b_successes, 1 + b_failures)

    Key outputs:
    1. P(B > A): probability that B is truly better
    2. Expected lift: E[(p_B - p_A) / p_A]
    3. Expected loss: E[max(p_A - p_B, 0)] (risk of choosing B)

    Advantages over frequentist A/B testing:
    - Direct probability statement ("B is better with 95% probability")
    - No need for fixed sample sizes or stopping rules
    - Natural handling of multiple comparisons
    - Risk/loss quantification
    """
    a_failures = a_total - a_successes
    b_failures = b_total - b_successes

    # Posterior parameters (uniform prior: Beta(1,1))
    alpha_a = 1 + a_successes
    beta_a = 1 + a_failures
    alpha_b = 1 + b_successes
    beta_b = 1 + b_failures

    # Monte Carlo estimation
    b_wins = 0
    lift_sum = 0.0
    loss_if_b_sum = 0.0  # loss from choosing B when A is better
    loss_if_a_sum = 0.0  # loss from choosing A when B is better

    for _ in range(num_samples):
        p_a = _sample_beta(alpha_a, beta_a)
        p_b = _sample_beta(alpha_b, beta_b)

        if p_b > p_a:
            b_wins += 1

        # Relative lift
        if p_a > 0:
            lift_sum += (p_b - p_a) / p_a

        # Expected losses
        loss_if_b_sum += max(p_a - p_b, 0)
        loss_if_a_sum += max(p_b - p_a, 0)

    return {
        "p_b_better": b_wins / num_samples,
        "p_a_better": 1 - b_wins / num_samples,
        "expected_lift": lift_sum / num_samples,
        "expected_loss_choosing_b": loss_if_b_sum / num_samples,
        "expected_loss_choosing_a": loss_if_a_sum / num_samples,
        "rate_a": a_successes / a_total,
        "rate_b": b_successes / b_total,
        "posterior_mean_a": alpha_a / (alpha_a + beta_a),
        "posterior_mean_b": alpha_b / (alpha_b + beta_b)
    }

# ---- VERIFICATION ----
random.seed(42)

# Clear winner: B has higher conversion
result = bayesian_ab_test(
    a_successes=100, a_total=1000,   # 10%
    b_successes=130, b_total=1000    # 13%
)
assert result["p_b_better"] > 0.95, f"B should be clearly better: {result['p_b_better']}"
assert result["expected_lift"] > 0.2, "Lift should be ~30%"

# Very close results: inconclusive
result2 = bayesian_ab_test(
    a_successes=100, a_total=1000,
    b_successes=103, b_total=1000
)
assert 0.3 < result2["p_b_better"] < 0.7, "Should be inconclusive"

# B clearly worse
result3 = bayesian_ab_test(
    a_successes=150, a_total=1000,
    b_successes=100, b_total=1000
)
assert result3["p_b_better"] < 0.01, "A should be clearly better"
print("All tests passed!")

Problem 5: Thompson Sampling for Multi-Armed Bandits

Problem: Implement Thompson sampling from scratch. Thompson sampling is a Bayesian approach to the exploration-exploitation tradeoff in multi-armed bandit problems. It is used in production at Google, Netflix, and Spotify for content recommendation and A/B testing.

import math
import random

# ---- CHALLENGE ----
class ThompsonSampling:
    """Thompson sampling for Bernoulli multi-armed bandits."""
    def __init__(self, num_arms):
        pass
    def select_arm(self):
        pass
    def update(self, arm, reward):
        pass

# ---- SOLUTION ----
def _sample_beta_ts(alpha, beta_param):
    """Sample from Beta distribution for Thompson Sampling."""
    def _gamma_sample(shape):
        if shape < 1:
            return _gamma_sample(shape + 1) * (random.random() ** (1.0 / shape))
        d = shape - 1.0 / 3.0
        c = 1.0 / math.sqrt(9.0 * d)
        while True:
            while True:
                u1 = random.random()
                u2 = random.random()
                while u1 == 0:
                    u1 = random.random()
                x = math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
                v = (1 + c * x) ** 3
                if v > 0:
                    break
            u = random.random()
            if u < 1 - 0.0331 * x ** 4 or math.log(u) < 0.5 * x ** 2 + d * (1 - v + math.log(v)):
                return d * v

    x = _gamma_sample(alpha)
    y = _gamma_sample(beta_param)
    return x / (x + y)

class ThompsonSampling:
    """Thompson Sampling for Bernoulli multi-armed bandits.

    Algorithm:
    1. Maintain Beta(alpha_i, beta_i) posterior for each arm i
    2. At each step:
       a. Sample theta_i ~ Beta(alpha_i, beta_i) for each arm
       b. Select the arm with the highest sampled theta
    3. After observing reward:
       a. If reward=1: alpha_i += 1 (success)
       b. If reward=0: beta_i += 1 (failure)

    Why Thompson Sampling works:
    - Naturally balances exploration (uncertain arms get wide samples)
      and exploitation (good arms have high posterior means)
    - Arms with high uncertainty get explored because their samples
      occasionally come out high
    - As evidence accumulates, the posterior concentrates and the
      algorithm exploits the best arm

    Thompson Sampling is provably optimal (in a Bayesian sense) and
    often outperforms UCB and epsilon-greedy in practice.
    """
    def __init__(self, num_arms):
        self.num_arms = num_arms
        # Uniform prior Beta(1,1) for each arm
        self.alpha = [1.0] * num_arms
        self.beta = [1.0] * num_arms
        self.counts = [0] * num_arms
        self.rewards = [0] * num_arms

    def select_arm(self):
        """Sample from each arm's posterior and select the highest."""
        samples = [_sample_beta_ts(self.alpha[i], self.beta[i]) for i in range(self.num_arms)]
        return samples.index(max(samples))

    def update(self, arm, reward):
        """Update posterior after observing reward (0 or 1)."""
        self.counts[arm] += 1
        self.rewards[arm] += reward
        if reward == 1:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1

    def get_stats(self):
        """Return current statistics for all arms."""
        stats = []
        for i in range(self.num_arms):
            stats.append({
                "arm": i,
                "pulls": self.counts[i],
                "wins": self.rewards[i],
                "posterior_mean": self.alpha[i] / (self.alpha[i] + self.beta[i]),
                "alpha": self.alpha[i],
                "beta": self.beta[i]
            })
        return stats

# ---- VERIFICATION ----
random.seed(42)

# 3-armed bandit with true probabilities [0.1, 0.5, 0.3]
true_probs = [0.1, 0.5, 0.3]
ts = ThompsonSampling(num_arms=3)

total_reward = 0
for _ in range(2000):
    arm = ts.select_arm()
    reward = 1 if random.random() < true_probs[arm] else 0
    ts.update(arm, reward)
    total_reward += reward

stats = ts.get_stats()

# Best arm (index 1, prob=0.5) should be pulled most
assert stats[1]["pulls"] > stats[0]["pulls"], "Arm 1 should be pulled more than arm 0"
assert stats[1]["pulls"] > stats[2]["pulls"], "Arm 1 should be pulled more than arm 2"

# Posterior mean of best arm should be close to true probability
assert abs(stats[1]["posterior_mean"] - 0.5) < 0.1

# Total reward should be close to optimal (0.5 * 2000 = 1000)
# Thompson sampling should achieve at least 70% of optimal
assert total_reward > 700, f"Reward {total_reward} too low"
print(f"Total reward: {total_reward} / 2000 (optimal ~1000)")
print("All tests passed!")

Key Takeaways

💡
  • Bayes' theorem is the foundation: posterior is proportional to prior times likelihood
  • Beta-Binomial is the most important conjugate model — know the update rules by heart
  • Naive Bayes works surprisingly well in practice despite the independence assumption
  • Bayesian A/B testing gives direct probability statements ("B is better with 95% probability")
  • Thompson sampling is provably optimal and used in production at Google, Netflix, and Spotify
  • Always use log-probabilities when multiplying many small numbers (Naive Bayes, sequential updates)