Advanced
Scheduling Problems
Scheduling problems combine heaps with greedy strategies to allocate resources optimally. The heap tracks "what finishes earliest" or "what has highest priority", enabling O(n log n) solutions for problems that would be NP-hard without the greedy insight. These patterns directly map to ML training job scheduling, GPU allocation, and distributed task queues.
Problem 1: Task Scheduler
Problem: Given a list of CPU tasks represented by characters and a cooldown period
Example:
n, find the minimum number of intervals needed to execute all tasks. The same task must have at least n intervals between executions.Example:
tasks = ["A","A","A","B","B","B"], n = 2 → Output: 8 (A B idle A B idle A B)import heapq
from collections import Counter, deque
def leastInterval(tasks: list[str], n: int) -> int:
"""Find minimum intervals to complete all tasks with cooldown.
Strategy:
1. Count task frequencies, build max-heap
2. At each time step, pop the most frequent task
3. After executing, place it in a cooldown queue
4. When cooldown expires, push it back to the heap
Time: O(T * n) where T = total tasks in worst case,
but typically O(T log 26) = O(T)
Space: O(1) - at most 26 distinct tasks
ML Application: GPU task scheduling - ensuring the same
model doesn't run on the same GPU back-to-back to allow
memory cleanup between runs.
"""
freq = Counter(tasks)
# Max-heap of remaining counts
max_heap = [-count for count in freq.values()]
heapq.heapify(max_heap)
# Cooldown queue: (available_time, remaining_count)
cooldown = deque()
time = 0
while max_heap or cooldown:
time += 1
if max_heap:
remaining = 1 + heapq.heappop(max_heap) # +1 because negated
if remaining < 0: # still has tasks left
cooldown.append((time + n, remaining))
# Check if any task's cooldown has expired
if cooldown and cooldown[0][0] == time:
_, count = cooldown.popleft()
heapq.heappush(max_heap, count)
return time
# Test
print(leastInterval(["A","A","A","B","B","B"], 2)) # 8
print(leastInterval(["A","A","A","B","B","B"], 0)) # 6
print(leastInterval(["A","A","A","A","A","A","B","C","D","E","F","G"], 2)) # 16
Problem 2: Meeting Rooms II
Problem: Given an array of meeting time intervals
Example:
[[start, end], ...], find the minimum number of conference rooms required.Example:
intervals = [[0,30],[5,10],[15,20]] → Output: 2import heapq
def minMeetingRooms(intervals: list[list[int]]) -> int:
"""Find minimum conference rooms needed for all meetings.
Strategy:
1. Sort meetings by start time
2. Use a min-heap to track end times of active meetings
3. For each meeting: if it starts after the earliest end,
reuse that room (pop); otherwise, add a new room
4. Heap size = number of rooms in use
Time: O(n log n) for sorting and heap operations
Space: O(n) for the heap
ML Application: Determining minimum GPU count needed to
run all scheduled training jobs without overlap.
"""
if not intervals:
return 0
intervals.sort(key=lambda x: x[0])
rooms = [] # min-heap of end times
for start, end in intervals:
# If earliest ending meeting finishes before this one starts
if rooms and rooms[0] <= start:
heapq.heappop(rooms) # reuse this room
heapq.heappush(rooms, end)
return len(rooms)
# Test
print(minMeetingRooms([[0,30],[5,10],[15,20]])) # 2
print(minMeetingRooms([[7,10],[2,4]])) # 1
print(minMeetingRooms([[1,5],[2,6],[3,7],[4,8]])) # 4
Problem 3: CPU Intervals (Single-Threaded CPU)
Problem: Given
Example:
n tasks where tasks[i] = [enqueueTime, processingTime], determine the order in which a single-threaded CPU will process them. The CPU picks the shortest task available; ties broken by index.Example:
tasks = [[1,2],[2,4],[3,2],[4,1]] → Output: [0,2,3,1]import heapq
def getOrder(tasks: list[list[int]]) -> list[int]:
"""Determine CPU task execution order (shortest job first).
Strategy:
1. Sort tasks by enqueue time (keep original indices)
2. Simulate CPU: at each step, push all available tasks
into a min-heap keyed by (processing_time, index)
3. Pop the shortest task, advance time by its duration
4. If no tasks available, jump to next enqueue time
Time: O(n log n)
Space: O(n)
ML Application: Scheduling inference requests on a single
GPU - shortest-job-first minimizes average latency.
"""
# Add original indices and sort by enqueue time
indexed_tasks = sorted(enumerate(tasks), key=lambda x: x[1][0])
result = []
heap = [] # (processing_time, original_index)
time = 0
i = 0
n = len(tasks)
while i < n or heap:
# If no tasks available, jump to next enqueue time
if not heap and i < n and indexed_tasks[i][1][0] > time:
time = indexed_tasks[i][1][0]
# Add all tasks that have arrived by current time
while i < n and indexed_tasks[i][1][0] <= time:
orig_idx, (enqueue, process) = indexed_tasks[i]
heapq.heappush(heap, (process, orig_idx))
i += 1
# Process the shortest available task
process_time, orig_idx = heapq.heappop(heap)
result.append(orig_idx)
time += process_time
return result
# Test
print(getOrder([[1,2],[2,4],[3,2],[4,1]])) # [0, 2, 3, 1]
print(getOrder([[7,10],[7,12],[7,5],[7,4],[7,2]])) # [4, 3, 2, 0, 1]
Problem 4: Job Scheduling with Deadlines
Problem: Given
Example:
n jobs where each job has a deadline d[i] and profit p[i], each job takes 1 unit of time. Schedule jobs to maximize total profit. A job must be completed before its deadline.Example:
deadlines = [4,1,1,1], profits = [20,10,40,30] → Output: 60 (jobs with profit 20 and 40)import heapq
def jobScheduling(deadlines: list[int], profits: list[int]) -> int:
"""Schedule jobs to maximize profit before deadlines.
Strategy:
1. Sort jobs by deadline
2. Use a min-heap to track selected jobs' profits
3. For each job: if we have a free slot (heap size < deadline),
add it. Otherwise, replace the least profitable job if
current job is more profitable.
Time: O(n log n)
Space: O(n)
ML Application: Scheduling hyperparameter tuning experiments
before a conference deadline, maximizing expected paper quality.
"""
# Pair and sort by deadline
jobs = sorted(zip(deadlines, profits), key=lambda x: x[0])
min_heap = [] # min-heap of profits of selected jobs
for deadline, profit in jobs:
if len(min_heap) < deadline:
# We have a free time slot
heapq.heappush(min_heap, profit)
elif min_heap and min_heap[0] < profit:
# Replace least profitable job
heapq.heapreplace(min_heap, profit)
return sum(min_heap)
# Test
print(jobScheduling([4,1,1,1], [20,10,40,30])) # 60
print(jobScheduling([2,1,2,1,3], [100,19,27,25,15])) # 142
Problem 5: Process Scheduling with Priorities
Problem: Simulate a priority-based process scheduler. Given processes with
Example:
[arrival_time, burst_time, priority] (lower priority number = higher priority), determine the order of execution and average waiting time using preemptive priority scheduling.Example:
processes = [[0,7,2],[2,4,1],[4,1,3],[5,4,2]]import heapq
def priorityScheduling(processes: list[list[int]]) -> dict:
"""Preemptive priority scheduling simulation.
Strategy:
1. Sort by arrival time
2. At each time step, push arrived processes into a min-heap
keyed by (priority, arrival_time, remaining_burst, pid)
3. Execute the highest-priority (lowest number) process
4. If a higher-priority process arrives, preempt
Time: O(T log n) where T = total burst time
Space: O(n)
ML Application: GPU cluster schedulers use priority queues
to preempt low-priority training jobs when urgent inference
serving requests arrive.
"""
n = len(processes)
# Add process IDs
procs = [(arr, burst, pri, pid)
for pid, (arr, burst, pri) in enumerate(processes)]
procs.sort(key=lambda x: x[0]) # sort by arrival
heap = [] # (priority, arrival, remaining_burst, pid)
time = 0
idx = 0
execution_order = []
completion = {}
total_burst = sum(p[1] for p in processes)
while idx < n or heap:
# Add all arrived processes
while idx < n and procs[idx][0] <= time:
arr, burst, pri, pid = procs[idx]
heapq.heappush(heap, (pri, arr, burst, pid))
idx += 1
if not heap:
# Jump to next arrival
if idx < n:
time = procs[idx][0]
continue
break
pri, arr, remaining, pid = heapq.heappop(heap)
# Find when next process arrives (for preemption check)
next_arrival = procs[idx][0] if idx < n else float('inf')
run_time = min(remaining, next_arrival - time)
if not execution_order or execution_order[-1] != pid:
execution_order.append(pid)
time += run_time
remaining -= run_time
if remaining > 0:
heapq.heappush(heap, (pri, arr, remaining, pid))
else:
completion[pid] = time
# Calculate waiting times
waiting_times = {}
for pid, (arr, burst, pri) in enumerate(processes):
waiting_times[pid] = completion.get(pid, 0) - arr - burst
avg_waiting = sum(waiting_times.values()) / n if n > 0 else 0
return {
"execution_order": execution_order,
"completion_times": completion,
"waiting_times": waiting_times,
"avg_waiting_time": round(avg_waiting, 2)
}
# Test
result = priorityScheduling([[0,7,2],[2,4,1],[4,1,3],[5,4,2]])
print(f"Execution order: {result['execution_order']}")
print(f"Avg waiting time: {result['avg_waiting_time']}")
Scheduling Pattern Summary: Most scheduling problems follow a template: (1) sort events by time, (2) use a heap to track active/available resources, (3) greedily pick the best option at each step. The heap key varies: end time for room allocation, processing time for SJF, priority for preemptive scheduling. Always ask: "What am I optimizing?" to determine the heap key.
Lilly Tech Systems