Intermediate
Two Heaps Pattern
The two-heap pattern uses a max-heap for the lower half and a min-heap for the upper half to maintain a running median or partition data into two balanced halves. This enables O(1) median lookups and O(log n) insertions — critical for streaming analytics, online learning systems, and real-time anomaly detection in ML pipelines.
Problem 1: Find Median from Data Stream
Problem: Design a data structure that supports:
Example: addNum(1), addNum(2), findMedian() → 1.5, addNum(3), findMedian() → 2.0
addNum(num) adds an integer to the stream, and findMedian() returns the median of all elements so far.Example: addNum(1), addNum(2), findMedian() → 1.5, addNum(3), findMedian() → 2.0
import heapq
class MedianFinder:
"""Find median from a data stream using two heaps.
Strategy:
- max_heap (negated): stores the smaller half of numbers
- min_heap: stores the larger half of numbers
- Invariant: len(max_heap) == len(min_heap) or len(max_heap) == len(min_heap) + 1
The median is either max_heap[0] (odd count) or
average of max_heap[0] and min_heap[0] (even count).
Time: O(log n) per addNum, O(1) per findMedian
Space: O(n)
ML Application: Online median computation for streaming
sensor data normalization in IoT/edge ML systems.
"""
def __init__(self):
self.max_heap = [] # lower half (negated values)
self.min_heap = [] # upper half
def addNum(self, num: int) -> None:
# Always add to max_heap first
heapq.heappush(self.max_heap, -num)
# Ensure max_heap's max <= min_heap's min
if self.min_heap and (-self.max_heap[0]) > self.min_heap[0]:
val = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, val)
# Balance sizes: max_heap can have at most 1 more element
if len(self.max_heap) > len(self.min_heap) + 1:
val = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, val)
elif len(self.min_heap) > len(self.max_heap):
val = heapq.heappop(self.min_heap)
heapq.heappush(self.max_heap, -val)
def findMedian(self) -> float:
if len(self.max_heap) > len(self.min_heap):
return -self.max_heap[0]
return (-self.max_heap[0] + self.min_heap[0]) / 2.0
# Test
mf = MedianFinder()
mf.addNum(1)
mf.addNum(2)
print(mf.findMedian()) # 1.5
mf.addNum(3)
print(mf.findMedian()) # 2.0
Problem 2: Sliding Window Median
Problem: Given an array
Example:
nums and an integer k, return the median of each window of size k as the window slides from left to right.Example:
nums = [1,3,-1,-3,5,3,6,7], k = 3 → Output: [1.0,-1.0,-1.0,3.0,5.0,6.0]import heapq
from collections import defaultdict
def medianSlidingWindow(nums: list[int], k: int) -> list[float]:
"""Find median of each sliding window of size k.
Strategy: Two heaps with lazy deletion.
- max_heap: lower half (negated)
- min_heap: upper half
- balance dict: track elements to be deleted lazily
Time: O(n log k) amortized
Space: O(n)
ML Application: Rolling median filters for denoising
time-series data in signal processing pipelines.
"""
max_heap = [] # lower half (negated)
min_heap = [] # upper half
to_remove = defaultdict(int)
result = []
# Helper: rebalance so max_heap has same or one more element
def rebalance():
while len(max_heap) > len(min_heap) + 1:
val = -heapq.heappop(max_heap)
heapq.heappush(min_heap, val)
while len(min_heap) > len(max_heap):
val = heapq.heappop(min_heap)
heapq.heappush(max_heap, -val)
# Helper: remove invalidated tops
def prune(heap, sign=1):
while heap and to_remove.get(sign * heap[0], 0) > 0:
to_remove[sign * heap[0]] -= 1
heapq.heappop(heap)
# Initialize window
window = sorted(nums[:k])
max_heap = [-x for x in window[:k // 2 + k % 2]]
min_heap = list(window[k // 2 + k % 2:])
heapq.heapify(max_heap)
heapq.heapify(min_heap)
def get_median():
if k % 2 == 1:
return float(-max_heap[0])
return (-max_heap[0] + min_heap[0]) / 2.0
result.append(get_median())
for i in range(k, len(nums)):
outgoing = nums[i - k]
incoming = nums[i]
# Mark outgoing for lazy deletion
to_remove[outgoing] = to_remove.get(outgoing, 0) + 1
# Add incoming to appropriate heap
if incoming <= -max_heap[0]:
heapq.heappush(max_heap, -incoming)
else:
heapq.heappush(min_heap, incoming)
# Determine which heap lost the outgoing element
if outgoing <= -max_heap[0]:
# Logically removed from max_heap, but incoming may have gone elsewhere
pass # lazy deletion handles it
# Rebalance sizes considering pending deletions
# Count effective sizes
max_effective = len(max_heap) - sum(
to_remove.get(-x, 0) for x in max_heap[:1] if to_remove.get(-x, 0) > 0
)
prune(max_heap, -1)
prune(min_heap, 1)
rebalance()
prune(max_heap, -1)
prune(min_heap, 1)
result.append(get_median())
return result
# Simpler alternative using SortedList for clarity:
from sortedcontainers import SortedList
def medianSlidingWindowClean(nums: list[int], k: int) -> list[float]:
"""Cleaner sliding window median using SortedList.
Time: O(n log k) - SortedList operations are O(log k)
Space: O(k)
"""
window = SortedList()
result = []
for i, num in enumerate(nums):
window.add(num)
if len(window) > k:
window.remove(nums[i - k])
if len(window) == k:
if k % 2 == 1:
result.append(float(window[k // 2]))
else:
result.append((window[k // 2 - 1] + window[k // 2]) / 2.0)
return result
# Test
print(medianSlidingWindowClean([1,3,-1,-3,5,3,6,7], 3))
# [1.0, -1.0, -1.0, 3.0, 5.0, 6.0]
Problem 3: IPO (Initial Public Offering)
Problem: You are given
Example:
k projects to maximize your capital. Each project i has a profit profits[i] and a minimum capital capital[i] required to start it. Starting with capital w, you can select at most k projects. Return the final maximized capital.Example:
k=2, w=0, profits=[1,2,3], capital=[0,1,1] → Output: 4import heapq
def findMaximizedCapital(k: int, w: int, profits: list[int],
capital: list[int]) -> int:
"""Maximize capital by selecting at most k projects.
Strategy: Two-phase heap approach:
1. Min-heap of projects sorted by capital requirement
2. As capital grows, move affordable projects to a max-heap (by profit)
3. Always pick the most profitable affordable project
Time: O(n log n) for sorting + O(k log n) for selections
Space: O(n)
ML Application: Resource allocation in ML training -
deciding which experiments to run given limited GPU budget,
maximizing the knowledge gained (profit) per compute dollar.
"""
# Min-heap: (capital_required, profit)
min_capital_heap = list(zip(capital, profits))
heapq.heapify(min_capital_heap)
# Max-heap: available projects by profit (negated)
max_profit_heap = []
for _ in range(k):
# Move all affordable projects to the profit heap
while min_capital_heap and min_capital_heap[0][0] <= w:
cap, prof = heapq.heappop(min_capital_heap)
heapq.heappush(max_profit_heap, -prof)
# Pick the most profitable project
if not max_profit_heap:
break
w += -heapq.heappop(max_profit_heap)
return w
# Test
print(findMaximizedCapital(2, 0, [1,2,3], [0,1,1])) # 4
print(findMaximizedCapital(3, 0, [1,2,3], [0,1,2])) # 6
Problem 4: Maximize Capital with Limited Selections
Problem: Given
Example:
n tasks with difficulty[i] and profit[i], and a list of workers with ability worker[j], assign each worker at most one task where difficulty[i] ≤ worker[j]. Maximize total profit.Example:
difficulty=[2,4,6,8], profit=[10,20,30,40], worker=[4,5,6,7] → Output: 100import heapq
def maxProfitAssignment(difficulty: list[int], profit: list[int],
worker: list[int]) -> int:
"""Assign workers to tasks to maximize total profit.
Strategy:
1. Sort tasks by difficulty
2. Sort workers by ability
3. For each worker, push all tasks they can do into a max-heap
4. Assign them the most profitable available task
Time: O(n log n + m log m) for sorting
Space: O(n)
ML Application: Assigning ML inference requests to GPU
workers based on model complexity and GPU capability,
maximizing throughput.
"""
# Pair tasks and sort by difficulty
tasks = sorted(zip(difficulty, profit))
workers_sorted = sorted(worker)
max_profit_heap = [] # max-heap (negated profits)
total = 0
task_idx = 0
for ability in workers_sorted:
# Add all tasks this worker can handle
while task_idx < len(tasks) and tasks[task_idx][0] <= ability:
heapq.heappush(max_profit_heap, -tasks[task_idx][1])
task_idx += 1
# Pick the best task (if any available)
if max_profit_heap:
total += -max_profit_heap[0] # peek, don't pop (reusable tasks)
return total
# Test
print(maxProfitAssignment([2,4,6,8,10], [10,20,30,40,50], [4,5,6,7])) # 100
print(maxProfitAssignment([68,35,52,47,86], [67,17,1,81,3], [92,10,85,84,82])) # 324
Two Heaps Pattern Summary: Use a max-heap for the lower half and a min-heap for the upper half whenever you need to track a median or partition data into two balanced halves. The IPO variant uses a min-heap (sorted by cost) feeding into a max-heap (sorted by profit) — a "two-phase" heap that unlocks items as resources grow.
Lilly Tech Systems