Tensor Operations
Six challenges covering the tensor manipulation skills every DL engineer must have. These operations are the building blocks for implementing layers, losses, and training infrastructure.
Challenge 1: Reshaping & Views
Given a batch of images stored as a flat tensor, reshape them for processing by a convolutional network, then flatten them back for a linear layer.
import torch
def reshape_for_conv_and_linear(flat_images, batch_size, channels, height, width):
"""
Challenge: Given flat_images of shape (batch_size, channels * height * width),
1. Reshape to (batch_size, channels, height, width) for conv layers
2. Reshape back to (batch_size, channels * height * width) for linear layers
3. Split into patches of size (patch_h, patch_w) = (height//2, width//2)
Return: (conv_input, linear_input, patches)
"""
# YOUR SOLUTION HERE
pass
# ---- SOLUTION ----
def reshape_for_conv_and_linear(flat_images, batch_size, channels, height, width):
# Step 1: Reshape for conv layers
# (B, C*H*W) -> (B, C, H, W)
conv_input = flat_images.view(batch_size, channels, height, width)
# Step 2: Flatten back for linear layers
# (B, C, H, W) -> (B, C*H*W)
linear_input = conv_input.view(batch_size, -1)
# Step 3: Split into non-overlapping patches
# (B, C, H, W) -> (B, C, H//patch_h, patch_h, W//patch_w, patch_w)
patch_h, patch_w = height // 2, width // 2
patches = conv_input.unfold(2, patch_h, patch_h).unfold(3, patch_w, patch_w)
# patches shape: (B, C, num_patches_h, num_patches_w, patch_h, patch_w)
# Rearrange to (B, num_patches, C, patch_h, patch_w)
B, C, nph, npw, ph, pw = patches.shape
patches = patches.permute(0, 2, 3, 1, 4, 5).contiguous()
patches = patches.view(B, nph * npw, C, ph, pw)
return conv_input, linear_input, patches
# Test
flat = torch.randn(4, 3 * 8 * 8) # batch=4, 3 channels, 8x8
conv_in, lin_in, patches = reshape_for_conv_and_linear(flat, 4, 3, 8, 8)
print(f"Conv input: {conv_in.shape}") # (4, 3, 8, 8)
print(f"Linear input: {lin_in.shape}") # (4, 192)
print(f"Patches: {patches.shape}") # (4, 4, 3, 4, 4)
Challenge 2: Broadcasting
Implement batch-wise operations that require broadcasting — the kind of operations you write daily in DL code.
import torch
def broadcasting_challenges(features, class_centers, per_sample_weights):
"""
Challenge: Given:
- features: (batch_size, feature_dim)
- class_centers: (num_classes, feature_dim)
- per_sample_weights: (batch_size,)
Compute:
1. Distance from each sample to each class center (pairwise L2 distances)
2. Weighted features: multiply each sample's features by its weight
3. Feature normalization: subtract per-feature mean, divide by per-feature std
Return: (distances, weighted_features, normalized_features)
"""
# YOUR SOLUTION HERE
pass
# ---- SOLUTION ----
def broadcasting_challenges(features, class_centers, per_sample_weights):
# 1. Pairwise L2 distances: (B, num_classes)
# features: (B, 1, D) - class_centers: (1, K, D) -> diff: (B, K, D)
diff = features.unsqueeze(1) - class_centers.unsqueeze(0)
distances = torch.norm(diff, dim=2) # (B, K)
# 2. Weighted features: (B, D)
# per_sample_weights: (B,) -> (B, 1) for broadcasting with (B, D)
weighted_features = features * per_sample_weights.unsqueeze(1)
# 3. Feature normalization: (B, D)
mean = features.mean(dim=0, keepdim=True) # (1, D)
std = features.std(dim=0, keepdim=True) # (1, D)
normalized_features = (features - mean) / (std + 1e-8)
return distances, weighted_features, normalized_features
# Test
B, D, K = 32, 64, 10
features = torch.randn(B, D)
centers = torch.randn(K, D)
weights = torch.rand(B)
dist, wf, nf = broadcasting_challenges(features, centers, weights)
print(f"Distances: {dist.shape}") # (32, 10)
print(f"Weighted: {wf.shape}") # (32, 64)
print(f"Normalized: {nf.shape}") # (32, 64)
Challenge 3: Advanced Indexing
Use gather, scatter, and fancy indexing to implement operations common in classification, embedding, and attention.
import torch
def indexing_challenges(logits, labels, embeddings, indices):
"""
Challenge: Given:
- logits: (batch_size, num_classes) - raw model outputs
- labels: (batch_size,) - ground truth class indices
- embeddings: (vocab_size, embed_dim) - embedding table
- indices: (batch_size, seq_len) - token indices
Compute:
1. Extract the logit for the correct class for each sample
2. Create a one-hot encoding of labels
3. Look up embeddings for each token index
Return: (correct_logits, one_hot, looked_up_embeddings)
"""
# YOUR SOLUTION HERE
pass
# ---- SOLUTION ----
def indexing_challenges(logits, labels, embeddings, indices):
B, C = logits.shape
V, E = embeddings.shape
# 1. Extract correct-class logit: (B,)
# Method A: fancy indexing
correct_logits = logits[torch.arange(B), labels]
# Method B: gather
# correct_logits = logits.gather(1, labels.unsqueeze(1)).squeeze(1)
# 2. One-hot encoding: (B, C)
one_hot = torch.zeros(B, C, device=logits.device)
one_hot.scatter_(1, labels.unsqueeze(1), 1.0)
# Alternative: one_hot = F.one_hot(labels, num_classes=C).float()
# 3. Embedding lookup: (B, seq_len, embed_dim)
looked_up = embeddings[indices] # fancy indexing on first dim
return correct_logits, one_hot, looked_up
# Test
B, C, V, E, S = 8, 10, 1000, 64, 16
logits = torch.randn(B, C)
labels = torch.randint(0, C, (B,))
embeddings = torch.randn(V, E)
indices = torch.randint(0, V, (B, S))
cl, oh, le = indexing_challenges(logits, labels, embeddings, indices)
print(f"Correct logits: {cl.shape}") # (8,)
print(f"One-hot: {oh.shape}") # (8, 10)
print(f"Embeddings: {le.shape}") # (8, 16, 64)
Challenge 4: Einsum
Use torch.einsum to express complex tensor contractions concisely — the way senior DL engineers write attention, bilinear layers, and batched operations.
import torch
def einsum_challenges(Q, K, V, bilinear_weight):
"""
Challenge: Given:
- Q: (batch, heads, seq_len, head_dim) - queries
- K: (batch, heads, seq_len, head_dim) - keys
- V: (batch, heads, seq_len, head_dim) - values
- bilinear_weight: (head_dim, head_dim, out_dim)
Compute using einsum:
1. Attention scores: Q @ K^T scaled (batch, heads, seq_len, seq_len)
2. Attention output: softmax(scores) @ V (batch, heads, seq_len, head_dim)
3. Bilinear form: sum over d1,d2 of Q[...,d1] * W[d1,d2,o] * K[...,d2]
Return: (attn_scores, attn_output, bilinear_output)
"""
# YOUR SOLUTION HERE
pass
# ---- SOLUTION ----
def einsum_challenges(Q, K, V, bilinear_weight):
head_dim = Q.shape[-1]
# 1. Attention scores: Q @ K^T / sqrt(d_k)
# Q: (B, H, S, D) x K^T: (B, H, D, S) -> (B, H, S, S)
attn_scores = torch.einsum('bhid,bhjd->bhij', Q, K) / (head_dim ** 0.5)
# 2. Attention output: softmax(scores) @ V
attn_weights = torch.softmax(attn_scores, dim=-1) # (B, H, S, S)
attn_output = torch.einsum('bhij,bhjd->bhid', attn_weights, V) # (B, H, S, D)
# 3. Bilinear form: Q * W * K
# Q: (B, H, S, D1), W: (D1, D2, O), K: (B, H, S, D2) -> (B, H, S, O)
bilinear_output = torch.einsum('bhsa,abo,bhsb->bhso', Q, bilinear_weight, K)
return attn_scores, attn_output, bilinear_output
# Test
B, H, S, D, O = 2, 4, 8, 16, 32
Q = torch.randn(B, H, S, D)
K = torch.randn(B, H, S, D)
V = torch.randn(B, H, S, D)
W = torch.randn(D, D, O)
scores, output, bilinear = einsum_challenges(Q, K, V, W)
print(f"Attn scores: {scores.shape}") # (2, 4, 8, 8)
print(f"Attn output: {output.shape}") # (2, 4, 8, 16)
print(f"Bilinear: {bilinear.shape}") # (2, 4, 8, 32)
Challenge 5: Gradient Computation
Manually compute and verify gradients — essential for implementing custom autograd functions and debugging training issues.
import torch
def gradient_challenges():
"""
Challenge:
1. Compute gradient of softmax cross-entropy loss w.r.t. logits
2. Implement a custom autograd function for a clamped ReLU
3. Verify your gradient with torch.autograd.gradcheck
Return: (loss_grad, custom_fn_output, gradcheck_passed)
"""
# YOUR SOLUTION HERE
pass
# ---- SOLUTION ----
import torch
import torch.nn.functional as F
# 1. Gradient of softmax cross-entropy
logits = torch.randn(4, 10, requires_grad=True)
labels = torch.randint(0, 10, (4,))
loss = F.cross_entropy(logits, labels)
loss.backward()
print(f"Loss grad shape: {logits.grad.shape}") # (4, 10)
# The gradient of cross-entropy w.r.t. logits is: softmax(logits) - one_hot(labels)
with torch.no_grad():
expected_grad = F.softmax(logits, dim=1)
expected_grad[torch.arange(4), labels] -= 1.0
expected_grad /= 4 # mean reduction
print(f"Grad matches: {torch.allclose(logits.grad, expected_grad, atol=1e-6)}")
# 2. Custom autograd function: clamped ReLU (clamp output to [0, max_val])
class ClampedReLU(torch.autograd.Function):
@staticmethod
def forward(ctx, input, max_val):
output = input.clamp(min=0, max=max_val)
ctx.save_for_backward(input)
ctx.max_val = max_val
return output
@staticmethod
def backward(ctx, grad_output):
input, = ctx.saved_tensors
# Gradient is 1 where 0 < input < max_val, else 0
grad_input = grad_output * ((input > 0) & (input < ctx.max_val)).float()
return grad_input, None # None for max_val (not a tensor)
clamped_relu = ClampedReLU.apply
x = torch.randn(5, requires_grad=True, dtype=torch.float64)
output = clamped_relu(x, 6.0)
print(f"Clamped ReLU output: {output}")
# 3. Verify with gradcheck
gradcheck_passed = torch.autograd.gradcheck(
lambda inp: ClampedReLU.apply(inp, 6.0),
(torch.randn(5, requires_grad=True, dtype=torch.float64),),
eps=1e-6
)
print(f"Gradcheck passed: {gradcheck_passed}")
Challenge 6: Device Management
Write device-agnostic code that works on CPU, single GPU, and multi-GPU setups — a must for production DL code.
import torch
import torch.nn as nn
def device_management_challenges():
"""
Challenge: Write device-agnostic utilities that work on CPU and GPU.
1. Create a device-agnostic model wrapper
2. Implement safe tensor transfer between devices
3. Handle mixed-device errors gracefully
"""
# YOUR SOLUTION HERE
pass
# ---- SOLUTION ----
class DeviceAgnosticModel(nn.Module):
"""Model wrapper that handles device placement automatically."""
def __init__(self, model):
super().__init__()
self.model = model
self._device = torch.device('cpu')
@property
def device(self):
"""Infer device from first parameter."""
return next(self.model.parameters()).device
def to_best_device(self):
"""Move to best available device."""
if torch.cuda.is_available():
device = torch.device('cuda')
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
device = torch.device('mps')
else:
device = torch.device('cpu')
self.model.to(device)
self._device = device
print(f"Model moved to {device}")
return self
def forward(self, x):
# Ensure input is on same device as model
x = x.to(self.device)
return self.model(x)
def safe_transfer(tensor, target_device, non_blocking=True):
"""Transfer tensor to device with error handling."""
if tensor.device == target_device:
return tensor
try:
return tensor.to(target_device, non_blocking=non_blocking)
except RuntimeError as e:
if 'out of memory' in str(e):
torch.cuda.empty_cache()
return tensor.to(target_device)
raise
def check_device_consistency(*tensors):
"""Verify all tensors are on the same device."""
devices = [t.device for t in tensors]
if len(set(str(d) for d in devices)) > 1:
raise RuntimeError(
f"Device mismatch: {[str(d) for d in devices]}. "
f"Move all tensors to the same device."
)
return devices[0]
# Test
model = nn.Linear(10, 5)
wrapper = DeviceAgnosticModel(model)
x = torch.randn(3, 10)
output = wrapper(x)
print(f"Output device: {output.device}")
print(f"Output shape: {output.shape}")
# Device consistency check
a = torch.randn(3, device='cpu')
b = torch.randn(3, device='cpu')
device = check_device_consistency(a, b)
print(f"All on: {device}")
tensor.to(device) and model.to(device) rather than hardcoding .cuda(). In interviews, define device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') at the top of your solution.
Lilly Tech Systems