Intermediate

Two Heaps Pattern

The two-heap pattern uses a max-heap for the lower half and a min-heap for the upper half to maintain a running median or partition data into two balanced halves. This enables O(1) median lookups and O(log n) insertions — critical for streaming analytics, online learning systems, and real-time anomaly detection in ML pipelines.

Problem 1: Find Median from Data Stream

📝
Problem: Design a data structure that supports: addNum(num) adds an integer to the stream, and findMedian() returns the median of all elements so far.

Example: addNum(1), addNum(2), findMedian() → 1.5, addNum(3), findMedian() → 2.0
import heapq

class MedianFinder:
    """Find median from a data stream using two heaps.

    Strategy:
    - max_heap (negated): stores the smaller half of numbers
    - min_heap: stores the larger half of numbers
    - Invariant: len(max_heap) == len(min_heap) or len(max_heap) == len(min_heap) + 1

    The median is either max_heap[0] (odd count) or
    average of max_heap[0] and min_heap[0] (even count).

    Time: O(log n) per addNum, O(1) per findMedian
    Space: O(n)

    ML Application: Online median computation for streaming
    sensor data normalization in IoT/edge ML systems.
    """
    def __init__(self):
        self.max_heap = []  # lower half (negated values)
        self.min_heap = []  # upper half

    def addNum(self, num: int) -> None:
        # Always add to max_heap first
        heapq.heappush(self.max_heap, -num)

        # Ensure max_heap's max <= min_heap's min
        if self.min_heap and (-self.max_heap[0]) > self.min_heap[0]:
            val = -heapq.heappop(self.max_heap)
            heapq.heappush(self.min_heap, val)

        # Balance sizes: max_heap can have at most 1 more element
        if len(self.max_heap) > len(self.min_heap) + 1:
            val = -heapq.heappop(self.max_heap)
            heapq.heappush(self.min_heap, val)
        elif len(self.min_heap) > len(self.max_heap):
            val = heapq.heappop(self.min_heap)
            heapq.heappush(self.max_heap, -val)

    def findMedian(self) -> float:
        if len(self.max_heap) > len(self.min_heap):
            return -self.max_heap[0]
        return (-self.max_heap[0] + self.min_heap[0]) / 2.0

# Test
mf = MedianFinder()
mf.addNum(1)
mf.addNum(2)
print(mf.findMedian())  # 1.5
mf.addNum(3)
print(mf.findMedian())  # 2.0

Problem 2: Sliding Window Median

📝
Problem: Given an array nums and an integer k, return the median of each window of size k as the window slides from left to right.

Example: nums = [1,3,-1,-3,5,3,6,7], k = 3 → Output: [1.0,-1.0,-1.0,3.0,5.0,6.0]
import heapq
from collections import defaultdict

def medianSlidingWindow(nums: list[int], k: int) -> list[float]:
    """Find median of each sliding window of size k.

    Strategy: Two heaps with lazy deletion.
    - max_heap: lower half (negated)
    - min_heap: upper half
    - balance dict: track elements to be deleted lazily

    Time: O(n log k) amortized
    Space: O(n)

    ML Application: Rolling median filters for denoising
    time-series data in signal processing pipelines.
    """
    max_heap = []  # lower half (negated)
    min_heap = []  # upper half
    to_remove = defaultdict(int)
    result = []

    # Helper: rebalance so max_heap has same or one more element
    def rebalance():
        while len(max_heap) > len(min_heap) + 1:
            val = -heapq.heappop(max_heap)
            heapq.heappush(min_heap, val)
        while len(min_heap) > len(max_heap):
            val = heapq.heappop(min_heap)
            heapq.heappush(max_heap, -val)

    # Helper: remove invalidated tops
    def prune(heap, sign=1):
        while heap and to_remove.get(sign * heap[0], 0) > 0:
            to_remove[sign * heap[0]] -= 1
            heapq.heappop(heap)

    # Initialize window
    window = sorted(nums[:k])
    max_heap = [-x for x in window[:k // 2 + k % 2]]
    min_heap = list(window[k // 2 + k % 2:])
    heapq.heapify(max_heap)
    heapq.heapify(min_heap)

    def get_median():
        if k % 2 == 1:
            return float(-max_heap[0])
        return (-max_heap[0] + min_heap[0]) / 2.0

    result.append(get_median())

    for i in range(k, len(nums)):
        outgoing = nums[i - k]
        incoming = nums[i]

        # Mark outgoing for lazy deletion
        to_remove[outgoing] = to_remove.get(outgoing, 0) + 1

        # Add incoming to appropriate heap
        if incoming <= -max_heap[0]:
            heapq.heappush(max_heap, -incoming)
        else:
            heapq.heappush(min_heap, incoming)

        # Determine which heap lost the outgoing element
        if outgoing <= -max_heap[0]:
            # Logically removed from max_heap, but incoming may have gone elsewhere
            pass  # lazy deletion handles it
        # Rebalance sizes considering pending deletions
        # Count effective sizes
        max_effective = len(max_heap) - sum(
            to_remove.get(-x, 0) for x in max_heap[:1] if to_remove.get(-x, 0) > 0
        )

        prune(max_heap, -1)
        prune(min_heap, 1)
        rebalance()
        prune(max_heap, -1)
        prune(min_heap, 1)

        result.append(get_median())

    return result

# Simpler alternative using SortedList for clarity:
from sortedcontainers import SortedList

def medianSlidingWindowClean(nums: list[int], k: int) -> list[float]:
    """Cleaner sliding window median using SortedList.

    Time: O(n log k) - SortedList operations are O(log k)
    Space: O(k)
    """
    window = SortedList()
    result = []

    for i, num in enumerate(nums):
        window.add(num)
        if len(window) > k:
            window.remove(nums[i - k])
        if len(window) == k:
            if k % 2 == 1:
                result.append(float(window[k // 2]))
            else:
                result.append((window[k // 2 - 1] + window[k // 2]) / 2.0)

    return result

# Test
print(medianSlidingWindowClean([1,3,-1,-3,5,3,6,7], 3))
# [1.0, -1.0, -1.0, 3.0, 5.0, 6.0]

Problem 3: IPO (Initial Public Offering)

📝
Problem: You are given k projects to maximize your capital. Each project i has a profit profits[i] and a minimum capital capital[i] required to start it. Starting with capital w, you can select at most k projects. Return the final maximized capital.

Example: k=2, w=0, profits=[1,2,3], capital=[0,1,1] → Output: 4
import heapq

def findMaximizedCapital(k: int, w: int, profits: list[int],
                         capital: list[int]) -> int:
    """Maximize capital by selecting at most k projects.

    Strategy: Two-phase heap approach:
    1. Min-heap of projects sorted by capital requirement
    2. As capital grows, move affordable projects to a max-heap (by profit)
    3. Always pick the most profitable affordable project

    Time: O(n log n) for sorting + O(k log n) for selections
    Space: O(n)

    ML Application: Resource allocation in ML training -
    deciding which experiments to run given limited GPU budget,
    maximizing the knowledge gained (profit) per compute dollar.
    """
    # Min-heap: (capital_required, profit)
    min_capital_heap = list(zip(capital, profits))
    heapq.heapify(min_capital_heap)

    # Max-heap: available projects by profit (negated)
    max_profit_heap = []

    for _ in range(k):
        # Move all affordable projects to the profit heap
        while min_capital_heap and min_capital_heap[0][0] <= w:
            cap, prof = heapq.heappop(min_capital_heap)
            heapq.heappush(max_profit_heap, -prof)

        # Pick the most profitable project
        if not max_profit_heap:
            break
        w += -heapq.heappop(max_profit_heap)

    return w

# Test
print(findMaximizedCapital(2, 0, [1,2,3], [0,1,1]))  # 4
print(findMaximizedCapital(3, 0, [1,2,3], [0,1,2]))  # 6

Problem 4: Maximize Capital with Limited Selections

📝
Problem: Given n tasks with difficulty[i] and profit[i], and a list of workers with ability worker[j], assign each worker at most one task where difficulty[i] ≤ worker[j]. Maximize total profit.

Example: difficulty=[2,4,6,8], profit=[10,20,30,40], worker=[4,5,6,7] → Output: 100
import heapq

def maxProfitAssignment(difficulty: list[int], profit: list[int],
                        worker: list[int]) -> int:
    """Assign workers to tasks to maximize total profit.

    Strategy:
    1. Sort tasks by difficulty
    2. Sort workers by ability
    3. For each worker, push all tasks they can do into a max-heap
    4. Assign them the most profitable available task

    Time: O(n log n + m log m) for sorting
    Space: O(n)

    ML Application: Assigning ML inference requests to GPU
    workers based on model complexity and GPU capability,
    maximizing throughput.
    """
    # Pair tasks and sort by difficulty
    tasks = sorted(zip(difficulty, profit))
    workers_sorted = sorted(worker)

    max_profit_heap = []  # max-heap (negated profits)
    total = 0
    task_idx = 0

    for ability in workers_sorted:
        # Add all tasks this worker can handle
        while task_idx < len(tasks) and tasks[task_idx][0] <= ability:
            heapq.heappush(max_profit_heap, -tasks[task_idx][1])
            task_idx += 1

        # Pick the best task (if any available)
        if max_profit_heap:
            total += -max_profit_heap[0]  # peek, don't pop (reusable tasks)

    return total

# Test
print(maxProfitAssignment([2,4,6,8,10], [10,20,30,40,50], [4,5,6,7]))  # 100
print(maxProfitAssignment([68,35,52,47,86], [67,17,1,81,3], [92,10,85,84,82]))  # 324
Two Heaps Pattern Summary: Use a max-heap for the lower half and a min-heap for the upper half whenever you need to track a median or partition data into two balanced halves. The IPO variant uses a min-heap (sorted by cost) feeding into a max-heap (sorted by profit) — a "two-phase" heap that unlocks items as resources grow.