Advanced
Topological Sort & DAGs
A topological sort of a Directed Acyclic Graph (DAG) produces a linear ordering where for every directed edge u → v, u appears before v. This is the algorithm that determines execution order in ML pipelines (Airflow, Kubeflow), resolves feature dependencies, and computes gradients during backpropagation. If you work with production ML, you use topological sort daily.
Problem 1: Course Schedule Order
Problem: Given numCourses and prerequisites [a, b] meaning "b before a", return a valid order to take all courses, or an empty array if impossible (cycle exists).
ML Context: This is exactly how Airflow determines task execution order. Given feature_engineering depends on data_ingestion and model_training depends on feature_engineering, topological sort gives: data_ingestion → feature_engineering → model_training.
ML Context: This is exactly how Airflow determines task execution order. Given feature_engineering depends on data_ingestion and model_training depends on feature_engineering, topological sort gives: data_ingestion → feature_engineering → model_training.
from collections import defaultdict, deque
def find_order(num_courses: int, prerequisites: list) -> list:
"""Kahn's algorithm (BFS topological sort).
Returns valid ordering or empty list if cycle exists.
Time: O(V + E), Space: O(V + E).
"""
graph = defaultdict(list)
in_degree = [0] * num_courses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
# Start with courses that have no prerequisites
queue = deque([i for i in range(num_courses) if in_degree[i] == 0])
order = []
while queue:
node = queue.popleft()
order.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return order if len(order) == num_courses else []
def find_order_dfs(num_courses: int, prerequisites: list) -> list:
"""DFS topological sort using reverse postorder.
Time: O(V + E), Space: O(V + E).
"""
graph = defaultdict(list)
for course, prereq in prerequisites:
graph[prereq].append(course)
WHITE, GRAY, BLACK = 0, 1, 2
color = [WHITE] * num_courses
order = []
has_cycle = False
def dfs(node):
nonlocal has_cycle
color[node] = GRAY
for neighbor in graph[node]:
if color[neighbor] == GRAY:
has_cycle = True
return
if color[neighbor] == WHITE:
dfs(neighbor)
color[node] = BLACK
order.append(node) # Add after all descendants processed
for i in range(num_courses):
if color[i] == WHITE:
dfs(i)
if has_cycle:
return []
return order[::-1] # Reverse postorder = topological order
# Test
print(find_order(4, [[1,0],[2,0],[3,1],[3,2]]))
# [0, 1, 2, 3] or [0, 2, 1, 3] - both valid
Problem 2: Task Scheduling with Dependencies
Problem: Given n tasks with dependencies and each task has a duration, find the minimum time to complete all tasks (tasks without dependencies can run in parallel).
ML Context: This models parallel ML pipeline execution. Data preprocessing (2 min) and feature extraction (3 min) can run in parallel, but model training (10 min) must wait for both. The critical path determines the minimum total time.
ML Context: This models parallel ML pipeline execution. Data preprocessing (2 min) and feature extraction (3 min) can run in parallel, but model training (10 min) must wait for both. The critical path determines the minimum total time.
def min_completion_time(n: int, dependencies: list, durations: list) -> int:
"""Find minimum time with parallel execution using topological sort.
Critical path = longest path in DAG (each node weighted by duration).
Time: O(V + E), Space: O(V + E).
"""
graph = defaultdict(list)
in_degree = [0] * n
for task, dep in dependencies:
graph[dep].append(task)
in_degree[task] += 1
# earliest_start[i] = earliest time task i can start
earliest_start = [0] * n
queue = deque([i for i in range(n) if in_degree[i] == 0])
while queue:
task = queue.popleft()
for neighbor in graph[task]:
# Neighbor can start after current task finishes
earliest_start[neighbor] = max(
earliest_start[neighbor],
earliest_start[task] + durations[task]
)
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# Total time = max(earliest_start[i] + duration[i]) for all tasks
return max(earliest_start[i] + durations[i] for i in range(n))
# Test: ML Pipeline
# Task 0: Data ingestion (2 min)
# Task 1: Feature engineering (3 min), depends on 0
# Task 2: Data validation (1 min), depends on 0
# Task 3: Model training (10 min), depends on 1, 2
# Task 4: Evaluation (2 min), depends on 3
durations = [2, 3, 1, 10, 2]
deps = [(1,0), (2,0), (3,1), (3,2), (4,3)]
print(min_completion_time(5, deps, durations)) # 17
# Critical path: 0(2) -> 1(3) -> 3(10) -> 4(2) = 17 min
Problem 3: Build Order
Problem: Given a list of projects and a list of dependencies (project, dependency), find a build order that allows all projects to be built. Return an error if no valid order exists.
ML Context: Package dependency resolution (pip, conda) uses topological sort. When installing scikit-learn, pip must first install numpy, scipy, and joblib. Circular dependencies make installation impossible.
ML Context: Package dependency resolution (pip, conda) uses topological sort. When installing scikit-learn, pip must first install numpy, scipy, and joblib. Circular dependencies make installation impossible.
def build_order(projects: list, dependencies: list) -> list:
"""Find valid build order using topological sort.
Time: O(P + D) where P = projects, D = dependencies.
"""
graph = defaultdict(list)
in_degree = defaultdict(int)
# Initialize all projects
for p in projects:
in_degree[p] = in_degree.get(p, 0)
for project, dependency in dependencies:
graph[dependency].append(project)
in_degree[project] += 1
queue = deque([p for p in projects if in_degree[p] == 0])
order = []
while queue:
project = queue.popleft()
order.append(project)
for dependent in graph[project]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(order) != len(projects):
raise ValueError("Circular dependency detected - no valid build order")
return order
# Test: ML library dependencies
projects = ["numpy", "scipy", "sklearn", "pandas", "matplotlib", "seaborn"]
deps = [
("scipy", "numpy"),
("sklearn", "numpy"),
("sklearn", "scipy"),
("pandas", "numpy"),
("matplotlib", "numpy"),
("seaborn", "matplotlib"),
("seaborn", "pandas")
]
print(build_order(projects, deps))
# ['numpy', 'scipy', 'pandas', 'matplotlib', 'sklearn', 'seaborn']
Problem 4: Alien Dictionary
Problem: Given a sorted list of words in an alien language, derive the character ordering. The words are sorted lexicographically according to the alien alphabet.
ML Context: This is constraint inference from ordered data — a pattern that appears in learning preferences from pairwise comparisons (learning to rank), where you must derive a total order from partial observations.
ML Context: This is constraint inference from ordered data — a pattern that appears in learning preferences from pairwise comparisons (learning to rank), where you must derive a total order from partial observations.
def alien_order(words: list) -> str:
"""Derive character ordering from sorted alien words.
Build a graph of character precedence, then topological sort.
Time: O(C) where C = total characters across all words.
"""
# Build adjacency list and track all unique characters
graph = defaultdict(set)
in_degree = {c: 0 for word in words for c in word}
# Compare adjacent words to find ordering constraints
for i in range(len(words) - 1):
w1, w2 = words[i], words[i + 1]
min_len = min(len(w1), len(w2))
# Edge case: "abc" before "ab" is invalid
if len(w1) > len(w2) and w1[:min_len] == w2[:min_len]:
return ""
for j in range(min_len):
if w1[j] != w2[j]:
if w2[j] not in graph[w1[j]]:
graph[w1[j]].add(w2[j])
in_degree[w2[j]] += 1
break # Only first difference matters
# Topological sort (BFS)
queue = deque([c for c in in_degree if in_degree[c] == 0])
result = []
while queue:
char = queue.popleft()
result.append(char)
for neighbor in graph[char]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(in_degree):
return "" # Cycle = invalid input
return "".join(result)
# Test
words = ["wrt", "wrf", "er", "ett", "rftt"]
print(alien_order(words)) # "wertf"
# From comparisons: w->e (wrt vs er), r->t (wrf vs er... no, er vs ett), t->f (wrt vs wrf)
Problem 5: DAG Shortest Path
Problem: Given a weighted DAG with n nodes and a source node, find the shortest path from source to all other nodes.
ML Context: DAG shortest path runs in O(V + E) — faster than Dijkstra — because topological order guarantees we process each node only after all its predecessors. This is how computation graphs compute the forward pass: process operations in topological order, propagating intermediate results along edges.
ML Context: DAG shortest path runs in O(V + E) — faster than Dijkstra — because topological order guarantees we process each node only after all its predecessors. This is how computation graphs compute the forward pass: process operations in topological order, propagating intermediate results along edges.
def dag_shortest_path(n: int, edges: list, source: int) -> list:
"""Shortest path in a weighted DAG using topological sort.
Faster than Dijkstra because DAG has no cycles.
Time: O(V + E), Space: O(V + E).
"""
graph = defaultdict(list)
in_degree = [0] * n
for u, v, weight in edges:
graph[u].append((v, weight))
in_degree[v] += 1
# Step 1: Topological sort
queue = deque([i for i in range(n) if in_degree[i] == 0])
topo_order = []
while queue:
node = queue.popleft()
topo_order.append(node)
for neighbor, _ in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# Step 2: Relax edges in topological order
dist = [float('inf')] * n
dist[source] = 0
for node in topo_order:
if dist[node] != float('inf'):
for neighbor, weight in graph[node]:
if dist[node] + weight < dist[neighbor]:
dist[neighbor] = dist[node] + weight
return dist
# DAG Longest Path (negate weights, or just take max)
def dag_longest_path(n: int, edges: list, source: int) -> list:
"""Longest path in DAG = critical path.
Time: O(V + E), Space: O(V + E).
"""
graph = defaultdict(list)
in_degree = [0] * n
for u, v, weight in edges:
graph[u].append((v, weight))
in_degree[v] += 1
queue = deque([i for i in range(n) if in_degree[i] == 0])
topo_order = []
while queue:
node = queue.popleft()
topo_order.append(node)
for neighbor, _ in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
dist = [float('-inf')] * n
dist[source] = 0
for node in topo_order:
if dist[node] != float('-inf'):
for neighbor, weight in graph[node]:
if dist[node] + weight > dist[neighbor]:
dist[neighbor] = dist[node] + weight
return dist
# Test
# 0 --2--> 1 --3--> 3
# | |
# 1 1
# v v
# 2 --4--> 3
edges = [(0,1,2), (0,2,1), (1,3,3), (2,3,4)]
print(dag_shortest_path(4, edges, 0)) # [0, 2, 1, 5]
# Shortest to node 3: 0->1->3 (2+3=5) vs 0->2->3 (1+4=5), both = 5
print(dag_longest_path(4, edges, 0)) # [0, 2, 1, 5]
Key insight: Any time you see "dependencies" or "ordering constraints" in a problem, think topological sort. The two implementations — Kahn's algorithm (BFS with in-degree tracking) and DFS with reverse postorder — are equivalent. Kahn's is easier to detect cycles (not all nodes processed); DFS is often more natural for recursive problems.
Lilly Tech Systems