Advanced

Scheduling Problems

Scheduling problems combine heaps with greedy strategies to allocate resources optimally. The heap tracks "what finishes earliest" or "what has highest priority", enabling O(n log n) solutions for problems that would be NP-hard without the greedy insight. These patterns directly map to ML training job scheduling, GPU allocation, and distributed task queues.

Problem 1: Task Scheduler

📝
Problem: Given a list of CPU tasks represented by characters and a cooldown period n, find the minimum number of intervals needed to execute all tasks. The same task must have at least n intervals between executions.

Example: tasks = ["A","A","A","B","B","B"], n = 2 → Output: 8 (A B idle A B idle A B)
import heapq
from collections import Counter, deque

def leastInterval(tasks: list[str], n: int) -> int:
    """Find minimum intervals to complete all tasks with cooldown.

    Strategy:
    1. Count task frequencies, build max-heap
    2. At each time step, pop the most frequent task
    3. After executing, place it in a cooldown queue
    4. When cooldown expires, push it back to the heap

    Time: O(T * n) where T = total tasks in worst case,
          but typically O(T log 26) = O(T)
    Space: O(1) - at most 26 distinct tasks

    ML Application: GPU task scheduling - ensuring the same
    model doesn't run on the same GPU back-to-back to allow
    memory cleanup between runs.
    """
    freq = Counter(tasks)
    # Max-heap of remaining counts
    max_heap = [-count for count in freq.values()]
    heapq.heapify(max_heap)

    # Cooldown queue: (available_time, remaining_count)
    cooldown = deque()
    time = 0

    while max_heap or cooldown:
        time += 1

        if max_heap:
            remaining = 1 + heapq.heappop(max_heap)  # +1 because negated
            if remaining < 0:  # still has tasks left
                cooldown.append((time + n, remaining))
        # Check if any task's cooldown has expired
        if cooldown and cooldown[0][0] == time:
            _, count = cooldown.popleft()
            heapq.heappush(max_heap, count)

    return time

# Test
print(leastInterval(["A","A","A","B","B","B"], 2))  # 8
print(leastInterval(["A","A","A","B","B","B"], 0))  # 6
print(leastInterval(["A","A","A","A","A","A","B","C","D","E","F","G"], 2))  # 16

Problem 2: Meeting Rooms II

📝
Problem: Given an array of meeting time intervals [[start, end], ...], find the minimum number of conference rooms required.

Example: intervals = [[0,30],[5,10],[15,20]] → Output: 2
import heapq

def minMeetingRooms(intervals: list[list[int]]) -> int:
    """Find minimum conference rooms needed for all meetings.

    Strategy:
    1. Sort meetings by start time
    2. Use a min-heap to track end times of active meetings
    3. For each meeting: if it starts after the earliest end,
       reuse that room (pop); otherwise, add a new room
    4. Heap size = number of rooms in use

    Time: O(n log n) for sorting and heap operations
    Space: O(n) for the heap

    ML Application: Determining minimum GPU count needed to
    run all scheduled training jobs without overlap.
    """
    if not intervals:
        return 0

    intervals.sort(key=lambda x: x[0])
    rooms = []  # min-heap of end times

    for start, end in intervals:
        # If earliest ending meeting finishes before this one starts
        if rooms and rooms[0] <= start:
            heapq.heappop(rooms)  # reuse this room

        heapq.heappush(rooms, end)

    return len(rooms)

# Test
print(minMeetingRooms([[0,30],[5,10],[15,20]]))  # 2
print(minMeetingRooms([[7,10],[2,4]]))  # 1
print(minMeetingRooms([[1,5],[2,6],[3,7],[4,8]]))  # 4

Problem 3: CPU Intervals (Single-Threaded CPU)

📝
Problem: Given n tasks where tasks[i] = [enqueueTime, processingTime], determine the order in which a single-threaded CPU will process them. The CPU picks the shortest task available; ties broken by index.

Example: tasks = [[1,2],[2,4],[3,2],[4,1]] → Output: [0,2,3,1]
import heapq

def getOrder(tasks: list[list[int]]) -> list[int]:
    """Determine CPU task execution order (shortest job first).

    Strategy:
    1. Sort tasks by enqueue time (keep original indices)
    2. Simulate CPU: at each step, push all available tasks
       into a min-heap keyed by (processing_time, index)
    3. Pop the shortest task, advance time by its duration
    4. If no tasks available, jump to next enqueue time

    Time: O(n log n)
    Space: O(n)

    ML Application: Scheduling inference requests on a single
    GPU - shortest-job-first minimizes average latency.
    """
    # Add original indices and sort by enqueue time
    indexed_tasks = sorted(enumerate(tasks), key=lambda x: x[1][0])

    result = []
    heap = []  # (processing_time, original_index)
    time = 0
    i = 0
    n = len(tasks)

    while i < n or heap:
        # If no tasks available, jump to next enqueue time
        if not heap and i < n and indexed_tasks[i][1][0] > time:
            time = indexed_tasks[i][1][0]

        # Add all tasks that have arrived by current time
        while i < n and indexed_tasks[i][1][0] <= time:
            orig_idx, (enqueue, process) = indexed_tasks[i]
            heapq.heappush(heap, (process, orig_idx))
            i += 1

        # Process the shortest available task
        process_time, orig_idx = heapq.heappop(heap)
        result.append(orig_idx)
        time += process_time

    return result

# Test
print(getOrder([[1,2],[2,4],[3,2],[4,1]]))  # [0, 2, 3, 1]
print(getOrder([[7,10],[7,12],[7,5],[7,4],[7,2]]))  # [4, 3, 2, 0, 1]

Problem 4: Job Scheduling with Deadlines

📝
Problem: Given n jobs where each job has a deadline d[i] and profit p[i], each job takes 1 unit of time. Schedule jobs to maximize total profit. A job must be completed before its deadline.

Example: deadlines = [4,1,1,1], profits = [20,10,40,30] → Output: 60 (jobs with profit 20 and 40)
import heapq

def jobScheduling(deadlines: list[int], profits: list[int]) -> int:
    """Schedule jobs to maximize profit before deadlines.

    Strategy:
    1. Sort jobs by deadline
    2. Use a min-heap to track selected jobs' profits
    3. For each job: if we have a free slot (heap size < deadline),
       add it. Otherwise, replace the least profitable job if
       current job is more profitable.

    Time: O(n log n)
    Space: O(n)

    ML Application: Scheduling hyperparameter tuning experiments
    before a conference deadline, maximizing expected paper quality.
    """
    # Pair and sort by deadline
    jobs = sorted(zip(deadlines, profits), key=lambda x: x[0])
    min_heap = []  # min-heap of profits of selected jobs

    for deadline, profit in jobs:
        if len(min_heap) < deadline:
            # We have a free time slot
            heapq.heappush(min_heap, profit)
        elif min_heap and min_heap[0] < profit:
            # Replace least profitable job
            heapq.heapreplace(min_heap, profit)

    return sum(min_heap)

# Test
print(jobScheduling([4,1,1,1], [20,10,40,30]))  # 60
print(jobScheduling([2,1,2,1,3], [100,19,27,25,15]))  # 142

Problem 5: Process Scheduling with Priorities

📝
Problem: Simulate a priority-based process scheduler. Given processes with [arrival_time, burst_time, priority] (lower priority number = higher priority), determine the order of execution and average waiting time using preemptive priority scheduling.

Example: processes = [[0,7,2],[2,4,1],[4,1,3],[5,4,2]]
import heapq

def priorityScheduling(processes: list[list[int]]) -> dict:
    """Preemptive priority scheduling simulation.

    Strategy:
    1. Sort by arrival time
    2. At each time step, push arrived processes into a min-heap
       keyed by (priority, arrival_time, remaining_burst, pid)
    3. Execute the highest-priority (lowest number) process
    4. If a higher-priority process arrives, preempt

    Time: O(T log n) where T = total burst time
    Space: O(n)

    ML Application: GPU cluster schedulers use priority queues
    to preempt low-priority training jobs when urgent inference
    serving requests arrive.
    """
    n = len(processes)
    # Add process IDs
    procs = [(arr, burst, pri, pid)
             for pid, (arr, burst, pri) in enumerate(processes)]
    procs.sort(key=lambda x: x[0])  # sort by arrival

    heap = []  # (priority, arrival, remaining_burst, pid)
    time = 0
    idx = 0
    execution_order = []
    completion = {}
    total_burst = sum(p[1] for p in processes)

    while idx < n or heap:
        # Add all arrived processes
        while idx < n and procs[idx][0] <= time:
            arr, burst, pri, pid = procs[idx]
            heapq.heappush(heap, (pri, arr, burst, pid))
            idx += 1

        if not heap:
            # Jump to next arrival
            if idx < n:
                time = procs[idx][0]
                continue
            break

        pri, arr, remaining, pid = heapq.heappop(heap)

        # Find when next process arrives (for preemption check)
        next_arrival = procs[idx][0] if idx < n else float('inf')
        run_time = min(remaining, next_arrival - time)

        if not execution_order or execution_order[-1] != pid:
            execution_order.append(pid)

        time += run_time
        remaining -= run_time

        if remaining > 0:
            heapq.heappush(heap, (pri, arr, remaining, pid))
        else:
            completion[pid] = time

    # Calculate waiting times
    waiting_times = {}
    for pid, (arr, burst, pri) in enumerate(processes):
        waiting_times[pid] = completion.get(pid, 0) - arr - burst

    avg_waiting = sum(waiting_times.values()) / n if n > 0 else 0

    return {
        "execution_order": execution_order,
        "completion_times": completion,
        "waiting_times": waiting_times,
        "avg_waiting_time": round(avg_waiting, 2)
    }

# Test
result = priorityScheduling([[0,7,2],[2,4,1],[4,1,3],[5,4,2]])
print(f"Execution order: {result['execution_order']}")
print(f"Avg waiting time: {result['avg_waiting_time']}")
Scheduling Pattern Summary: Most scheduling problems follow a template: (1) sort events by time, (2) use a heap to track active/available resources, (3) greedily pick the best option at each step. The heap key varies: end time for room allocation, processing time for SJF, priority for preemptive scheduling. Always ask: "What am I optimizing?" to determine the heap key.