Advanced

Topological Sort & DAGs

A topological sort of a Directed Acyclic Graph (DAG) produces a linear ordering where for every directed edge u → v, u appears before v. This is the algorithm that determines execution order in ML pipelines (Airflow, Kubeflow), resolves feature dependencies, and computes gradients during backpropagation. If you work with production ML, you use topological sort daily.

Problem 1: Course Schedule Order

🎯
Problem: Given numCourses and prerequisites [a, b] meaning "b before a", return a valid order to take all courses, or an empty array if impossible (cycle exists).
ML Context: This is exactly how Airflow determines task execution order. Given feature_engineering depends on data_ingestion and model_training depends on feature_engineering, topological sort gives: data_ingestion → feature_engineering → model_training.
from collections import defaultdict, deque

def find_order(num_courses: int, prerequisites: list) -> list:
    """Kahn's algorithm (BFS topological sort).
    Returns valid ordering or empty list if cycle exists.
    Time: O(V + E), Space: O(V + E).
    """
    graph = defaultdict(list)
    in_degree = [0] * num_courses

    for course, prereq in prerequisites:
        graph[prereq].append(course)
        in_degree[course] += 1

    # Start with courses that have no prerequisites
    queue = deque([i for i in range(num_courses) if in_degree[i] == 0])
    order = []

    while queue:
        node = queue.popleft()
        order.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    return order if len(order) == num_courses else []

def find_order_dfs(num_courses: int, prerequisites: list) -> list:
    """DFS topological sort using reverse postorder.
    Time: O(V + E), Space: O(V + E).
    """
    graph = defaultdict(list)
    for course, prereq in prerequisites:
        graph[prereq].append(course)

    WHITE, GRAY, BLACK = 0, 1, 2
    color = [WHITE] * num_courses
    order = []
    has_cycle = False

    def dfs(node):
        nonlocal has_cycle
        color[node] = GRAY
        for neighbor in graph[node]:
            if color[neighbor] == GRAY:
                has_cycle = True
                return
            if color[neighbor] == WHITE:
                dfs(neighbor)
        color[node] = BLACK
        order.append(node)  # Add after all descendants processed

    for i in range(num_courses):
        if color[i] == WHITE:
            dfs(i)
            if has_cycle:
                return []

    return order[::-1]  # Reverse postorder = topological order

# Test
print(find_order(4, [[1,0],[2,0],[3,1],[3,2]]))
# [0, 1, 2, 3] or [0, 2, 1, 3] - both valid

Problem 2: Task Scheduling with Dependencies

🎯
Problem: Given n tasks with dependencies and each task has a duration, find the minimum time to complete all tasks (tasks without dependencies can run in parallel).
ML Context: This models parallel ML pipeline execution. Data preprocessing (2 min) and feature extraction (3 min) can run in parallel, but model training (10 min) must wait for both. The critical path determines the minimum total time.
def min_completion_time(n: int, dependencies: list, durations: list) -> int:
    """Find minimum time with parallel execution using topological sort.
    Critical path = longest path in DAG (each node weighted by duration).
    Time: O(V + E), Space: O(V + E).
    """
    graph = defaultdict(list)
    in_degree = [0] * n

    for task, dep in dependencies:
        graph[dep].append(task)
        in_degree[task] += 1

    # earliest_start[i] = earliest time task i can start
    earliest_start = [0] * n
    queue = deque([i for i in range(n) if in_degree[i] == 0])

    while queue:
        task = queue.popleft()
        for neighbor in graph[task]:
            # Neighbor can start after current task finishes
            earliest_start[neighbor] = max(
                earliest_start[neighbor],
                earliest_start[task] + durations[task]
            )
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    # Total time = max(earliest_start[i] + duration[i]) for all tasks
    return max(earliest_start[i] + durations[i] for i in range(n))

# Test: ML Pipeline
# Task 0: Data ingestion (2 min)
# Task 1: Feature engineering (3 min), depends on 0
# Task 2: Data validation (1 min), depends on 0
# Task 3: Model training (10 min), depends on 1, 2
# Task 4: Evaluation (2 min), depends on 3
durations = [2, 3, 1, 10, 2]
deps = [(1,0), (2,0), (3,1), (3,2), (4,3)]
print(min_completion_time(5, deps, durations))  # 17
# Critical path: 0(2) -> 1(3) -> 3(10) -> 4(2) = 17 min

Problem 3: Build Order

🎯
Problem: Given a list of projects and a list of dependencies (project, dependency), find a build order that allows all projects to be built. Return an error if no valid order exists.
ML Context: Package dependency resolution (pip, conda) uses topological sort. When installing scikit-learn, pip must first install numpy, scipy, and joblib. Circular dependencies make installation impossible.
def build_order(projects: list, dependencies: list) -> list:
    """Find valid build order using topological sort.
    Time: O(P + D) where P = projects, D = dependencies.
    """
    graph = defaultdict(list)
    in_degree = defaultdict(int)

    # Initialize all projects
    for p in projects:
        in_degree[p] = in_degree.get(p, 0)

    for project, dependency in dependencies:
        graph[dependency].append(project)
        in_degree[project] += 1

    queue = deque([p for p in projects if in_degree[p] == 0])
    order = []

    while queue:
        project = queue.popleft()
        order.append(project)
        for dependent in graph[project]:
            in_degree[dependent] -= 1
            if in_degree[dependent] == 0:
                queue.append(dependent)

    if len(order) != len(projects):
        raise ValueError("Circular dependency detected - no valid build order")

    return order

# Test: ML library dependencies
projects = ["numpy", "scipy", "sklearn", "pandas", "matplotlib", "seaborn"]
deps = [
    ("scipy", "numpy"),
    ("sklearn", "numpy"),
    ("sklearn", "scipy"),
    ("pandas", "numpy"),
    ("matplotlib", "numpy"),
    ("seaborn", "matplotlib"),
    ("seaborn", "pandas")
]
print(build_order(projects, deps))
# ['numpy', 'scipy', 'pandas', 'matplotlib', 'sklearn', 'seaborn']

Problem 4: Alien Dictionary

🎯
Problem: Given a sorted list of words in an alien language, derive the character ordering. The words are sorted lexicographically according to the alien alphabet.
ML Context: This is constraint inference from ordered data — a pattern that appears in learning preferences from pairwise comparisons (learning to rank), where you must derive a total order from partial observations.
def alien_order(words: list) -> str:
    """Derive character ordering from sorted alien words.
    Build a graph of character precedence, then topological sort.
    Time: O(C) where C = total characters across all words.
    """
    # Build adjacency list and track all unique characters
    graph = defaultdict(set)
    in_degree = {c: 0 for word in words for c in word}

    # Compare adjacent words to find ordering constraints
    for i in range(len(words) - 1):
        w1, w2 = words[i], words[i + 1]
        min_len = min(len(w1), len(w2))

        # Edge case: "abc" before "ab" is invalid
        if len(w1) > len(w2) and w1[:min_len] == w2[:min_len]:
            return ""

        for j in range(min_len):
            if w1[j] != w2[j]:
                if w2[j] not in graph[w1[j]]:
                    graph[w1[j]].add(w2[j])
                    in_degree[w2[j]] += 1
                break  # Only first difference matters

    # Topological sort (BFS)
    queue = deque([c for c in in_degree if in_degree[c] == 0])
    result = []

    while queue:
        char = queue.popleft()
        result.append(char)
        for neighbor in graph[char]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    if len(result) != len(in_degree):
        return ""  # Cycle = invalid input

    return "".join(result)

# Test
words = ["wrt", "wrf", "er", "ett", "rftt"]
print(alien_order(words))  # "wertf"
# From comparisons: w->e (wrt vs er), r->t (wrf vs er... no, er vs ett), t->f (wrt vs wrf)

Problem 5: DAG Shortest Path

🎯
Problem: Given a weighted DAG with n nodes and a source node, find the shortest path from source to all other nodes.
ML Context: DAG shortest path runs in O(V + E) — faster than Dijkstra — because topological order guarantees we process each node only after all its predecessors. This is how computation graphs compute the forward pass: process operations in topological order, propagating intermediate results along edges.
def dag_shortest_path(n: int, edges: list, source: int) -> list:
    """Shortest path in a weighted DAG using topological sort.
    Faster than Dijkstra because DAG has no cycles.
    Time: O(V + E), Space: O(V + E).
    """
    graph = defaultdict(list)
    in_degree = [0] * n

    for u, v, weight in edges:
        graph[u].append((v, weight))
        in_degree[v] += 1

    # Step 1: Topological sort
    queue = deque([i for i in range(n) if in_degree[i] == 0])
    topo_order = []
    while queue:
        node = queue.popleft()
        topo_order.append(node)
        for neighbor, _ in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    # Step 2: Relax edges in topological order
    dist = [float('inf')] * n
    dist[source] = 0

    for node in topo_order:
        if dist[node] != float('inf'):
            for neighbor, weight in graph[node]:
                if dist[node] + weight < dist[neighbor]:
                    dist[neighbor] = dist[node] + weight

    return dist

# DAG Longest Path (negate weights, or just take max)
def dag_longest_path(n: int, edges: list, source: int) -> list:
    """Longest path in DAG = critical path.
    Time: O(V + E), Space: O(V + E).
    """
    graph = defaultdict(list)
    in_degree = [0] * n
    for u, v, weight in edges:
        graph[u].append((v, weight))
        in_degree[v] += 1

    queue = deque([i for i in range(n) if in_degree[i] == 0])
    topo_order = []
    while queue:
        node = queue.popleft()
        topo_order.append(node)
        for neighbor, _ in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    dist = [float('-inf')] * n
    dist[source] = 0

    for node in topo_order:
        if dist[node] != float('-inf'):
            for neighbor, weight in graph[node]:
                if dist[node] + weight > dist[neighbor]:
                    dist[neighbor] = dist[node] + weight

    return dist

# Test
#   0 --2--> 1 --3--> 3
#   |        |
#   1        1
#   v        v
#   2 --4--> 3
edges = [(0,1,2), (0,2,1), (1,3,3), (2,3,4)]
print(dag_shortest_path(4, edges, 0))  # [0, 2, 1, 5]
# Shortest to node 3: 0->1->3 (2+3=5) vs 0->2->3 (1+4=5), both = 5
print(dag_longest_path(4, edges, 0))   # [0, 2, 1, 5]
💡
Key insight: Any time you see "dependencies" or "ordering constraints" in a problem, think topological sort. The two implementations — Kahn's algorithm (BFS with in-degree tracking) and DFS with reverse postorder — are equivalent. Kahn's is easier to detect cycles (not all nodes processed); DFS is often more natural for recursive problems.