Intermediate

Tensor Operations

Six challenges covering the tensor manipulation skills every DL engineer must have. These operations are the building blocks for implementing layers, losses, and training infrastructure.

Challenge 1: Reshaping & Views

Given a batch of images stored as a flat tensor, reshape them for processing by a convolutional network, then flatten them back for a linear layer.

import torch

def reshape_for_conv_and_linear(flat_images, batch_size, channels, height, width):
    """
    Challenge: Given flat_images of shape (batch_size, channels * height * width),
    1. Reshape to (batch_size, channels, height, width) for conv layers
    2. Reshape back to (batch_size, channels * height * width) for linear layers
    3. Split into patches of size (patch_h, patch_w) = (height//2, width//2)

    Return: (conv_input, linear_input, patches)
    """
    # YOUR SOLUTION HERE
    pass

# ---- SOLUTION ----
def reshape_for_conv_and_linear(flat_images, batch_size, channels, height, width):
    # Step 1: Reshape for conv layers
    # (B, C*H*W) -> (B, C, H, W)
    conv_input = flat_images.view(batch_size, channels, height, width)

    # Step 2: Flatten back for linear layers
    # (B, C, H, W) -> (B, C*H*W)
    linear_input = conv_input.view(batch_size, -1)

    # Step 3: Split into non-overlapping patches
    # (B, C, H, W) -> (B, C, H//patch_h, patch_h, W//patch_w, patch_w)
    patch_h, patch_w = height // 2, width // 2
    patches = conv_input.unfold(2, patch_h, patch_h).unfold(3, patch_w, patch_w)
    # patches shape: (B, C, num_patches_h, num_patches_w, patch_h, patch_w)
    # Rearrange to (B, num_patches, C, patch_h, patch_w)
    B, C, nph, npw, ph, pw = patches.shape
    patches = patches.permute(0, 2, 3, 1, 4, 5).contiguous()
    patches = patches.view(B, nph * npw, C, ph, pw)

    return conv_input, linear_input, patches

# Test
flat = torch.randn(4, 3 * 8 * 8)   # batch=4, 3 channels, 8x8
conv_in, lin_in, patches = reshape_for_conv_and_linear(flat, 4, 3, 8, 8)
print(f"Conv input:    {conv_in.shape}")    # (4, 3, 8, 8)
print(f"Linear input:  {lin_in.shape}")     # (4, 192)
print(f"Patches:       {patches.shape}")    # (4, 4, 3, 4, 4)

Challenge 2: Broadcasting

Implement batch-wise operations that require broadcasting — the kind of operations you write daily in DL code.

import torch

def broadcasting_challenges(features, class_centers, per_sample_weights):
    """
    Challenge: Given:
      - features: (batch_size, feature_dim)
      - class_centers: (num_classes, feature_dim)
      - per_sample_weights: (batch_size,)

    Compute:
    1. Distance from each sample to each class center (pairwise L2 distances)
    2. Weighted features: multiply each sample's features by its weight
    3. Feature normalization: subtract per-feature mean, divide by per-feature std

    Return: (distances, weighted_features, normalized_features)
    """
    # YOUR SOLUTION HERE
    pass

# ---- SOLUTION ----
def broadcasting_challenges(features, class_centers, per_sample_weights):
    # 1. Pairwise L2 distances: (B, num_classes)
    # features: (B, 1, D) - class_centers: (1, K, D) -> diff: (B, K, D)
    diff = features.unsqueeze(1) - class_centers.unsqueeze(0)
    distances = torch.norm(diff, dim=2)  # (B, K)

    # 2. Weighted features: (B, D)
    # per_sample_weights: (B,) -> (B, 1) for broadcasting with (B, D)
    weighted_features = features * per_sample_weights.unsqueeze(1)

    # 3. Feature normalization: (B, D)
    mean = features.mean(dim=0, keepdim=True)   # (1, D)
    std = features.std(dim=0, keepdim=True)     # (1, D)
    normalized_features = (features - mean) / (std + 1e-8)

    return distances, weighted_features, normalized_features

# Test
B, D, K = 32, 64, 10
features = torch.randn(B, D)
centers = torch.randn(K, D)
weights = torch.rand(B)
dist, wf, nf = broadcasting_challenges(features, centers, weights)
print(f"Distances:    {dist.shape}")     # (32, 10)
print(f"Weighted:     {wf.shape}")       # (32, 64)
print(f"Normalized:   {nf.shape}")       # (32, 64)

Challenge 3: Advanced Indexing

Use gather, scatter, and fancy indexing to implement operations common in classification, embedding, and attention.

import torch

def indexing_challenges(logits, labels, embeddings, indices):
    """
    Challenge: Given:
      - logits: (batch_size, num_classes) - raw model outputs
      - labels: (batch_size,) - ground truth class indices
      - embeddings: (vocab_size, embed_dim) - embedding table
      - indices: (batch_size, seq_len) - token indices

    Compute:
    1. Extract the logit for the correct class for each sample
    2. Create a one-hot encoding of labels
    3. Look up embeddings for each token index

    Return: (correct_logits, one_hot, looked_up_embeddings)
    """
    # YOUR SOLUTION HERE
    pass

# ---- SOLUTION ----
def indexing_challenges(logits, labels, embeddings, indices):
    B, C = logits.shape
    V, E = embeddings.shape

    # 1. Extract correct-class logit: (B,)
    # Method A: fancy indexing
    correct_logits = logits[torch.arange(B), labels]
    # Method B: gather
    # correct_logits = logits.gather(1, labels.unsqueeze(1)).squeeze(1)

    # 2. One-hot encoding: (B, C)
    one_hot = torch.zeros(B, C, device=logits.device)
    one_hot.scatter_(1, labels.unsqueeze(1), 1.0)
    # Alternative: one_hot = F.one_hot(labels, num_classes=C).float()

    # 3. Embedding lookup: (B, seq_len, embed_dim)
    looked_up = embeddings[indices]  # fancy indexing on first dim

    return correct_logits, one_hot, looked_up

# Test
B, C, V, E, S = 8, 10, 1000, 64, 16
logits = torch.randn(B, C)
labels = torch.randint(0, C, (B,))
embeddings = torch.randn(V, E)
indices = torch.randint(0, V, (B, S))
cl, oh, le = indexing_challenges(logits, labels, embeddings, indices)
print(f"Correct logits:  {cl.shape}")     # (8,)
print(f"One-hot:         {oh.shape}")      # (8, 10)
print(f"Embeddings:      {le.shape}")      # (8, 16, 64)

Challenge 4: Einsum

Use torch.einsum to express complex tensor contractions concisely — the way senior DL engineers write attention, bilinear layers, and batched operations.

import torch

def einsum_challenges(Q, K, V, bilinear_weight):
    """
    Challenge: Given:
      - Q: (batch, heads, seq_len, head_dim) - queries
      - K: (batch, heads, seq_len, head_dim) - keys
      - V: (batch, heads, seq_len, head_dim) - values
      - bilinear_weight: (head_dim, head_dim, out_dim)

    Compute using einsum:
    1. Attention scores: Q @ K^T scaled (batch, heads, seq_len, seq_len)
    2. Attention output: softmax(scores) @ V (batch, heads, seq_len, head_dim)
    3. Bilinear form: sum over d1,d2 of Q[...,d1] * W[d1,d2,o] * K[...,d2]

    Return: (attn_scores, attn_output, bilinear_output)
    """
    # YOUR SOLUTION HERE
    pass

# ---- SOLUTION ----
def einsum_challenges(Q, K, V, bilinear_weight):
    head_dim = Q.shape[-1]

    # 1. Attention scores: Q @ K^T / sqrt(d_k)
    # Q: (B, H, S, D) x K^T: (B, H, D, S) -> (B, H, S, S)
    attn_scores = torch.einsum('bhid,bhjd->bhij', Q, K) / (head_dim ** 0.5)

    # 2. Attention output: softmax(scores) @ V
    attn_weights = torch.softmax(attn_scores, dim=-1)  # (B, H, S, S)
    attn_output = torch.einsum('bhij,bhjd->bhid', attn_weights, V)  # (B, H, S, D)

    # 3. Bilinear form: Q * W * K
    # Q: (B, H, S, D1), W: (D1, D2, O), K: (B, H, S, D2) -> (B, H, S, O)
    bilinear_output = torch.einsum('bhsa,abo,bhsb->bhso', Q, bilinear_weight, K)

    return attn_scores, attn_output, bilinear_output

# Test
B, H, S, D, O = 2, 4, 8, 16, 32
Q = torch.randn(B, H, S, D)
K = torch.randn(B, H, S, D)
V = torch.randn(B, H, S, D)
W = torch.randn(D, D, O)
scores, output, bilinear = einsum_challenges(Q, K, V, W)
print(f"Attn scores:     {scores.shape}")    # (2, 4, 8, 8)
print(f"Attn output:     {output.shape}")    # (2, 4, 8, 16)
print(f"Bilinear:        {bilinear.shape}")  # (2, 4, 8, 32)

Challenge 5: Gradient Computation

Manually compute and verify gradients — essential for implementing custom autograd functions and debugging training issues.

import torch

def gradient_challenges():
    """
    Challenge:
    1. Compute gradient of softmax cross-entropy loss w.r.t. logits
    2. Implement a custom autograd function for a clamped ReLU
    3. Verify your gradient with torch.autograd.gradcheck

    Return: (loss_grad, custom_fn_output, gradcheck_passed)
    """
    # YOUR SOLUTION HERE
    pass

# ---- SOLUTION ----
import torch
import torch.nn.functional as F

# 1. Gradient of softmax cross-entropy
logits = torch.randn(4, 10, requires_grad=True)
labels = torch.randint(0, 10, (4,))
loss = F.cross_entropy(logits, labels)
loss.backward()
print(f"Loss grad shape: {logits.grad.shape}")  # (4, 10)

# The gradient of cross-entropy w.r.t. logits is: softmax(logits) - one_hot(labels)
with torch.no_grad():
    expected_grad = F.softmax(logits, dim=1)
    expected_grad[torch.arange(4), labels] -= 1.0
    expected_grad /= 4  # mean reduction
    print(f"Grad matches: {torch.allclose(logits.grad, expected_grad, atol=1e-6)}")

# 2. Custom autograd function: clamped ReLU (clamp output to [0, max_val])
class ClampedReLU(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input, max_val):
        output = input.clamp(min=0, max=max_val)
        ctx.save_for_backward(input)
        ctx.max_val = max_val
        return output

    @staticmethod
    def backward(ctx, grad_output):
        input, = ctx.saved_tensors
        # Gradient is 1 where 0 < input < max_val, else 0
        grad_input = grad_output * ((input > 0) & (input < ctx.max_val)).float()
        return grad_input, None  # None for max_val (not a tensor)

clamped_relu = ClampedReLU.apply
x = torch.randn(5, requires_grad=True, dtype=torch.float64)
output = clamped_relu(x, 6.0)
print(f"Clamped ReLU output: {output}")

# 3. Verify with gradcheck
gradcheck_passed = torch.autograd.gradcheck(
    lambda inp: ClampedReLU.apply(inp, 6.0),
    (torch.randn(5, requires_grad=True, dtype=torch.float64),),
    eps=1e-6
)
print(f"Gradcheck passed: {gradcheck_passed}")

Challenge 6: Device Management

Write device-agnostic code that works on CPU, single GPU, and multi-GPU setups — a must for production DL code.

import torch
import torch.nn as nn

def device_management_challenges():
    """
    Challenge: Write device-agnostic utilities that work on CPU and GPU.
    1. Create a device-agnostic model wrapper
    2. Implement safe tensor transfer between devices
    3. Handle mixed-device errors gracefully
    """
    # YOUR SOLUTION HERE
    pass

# ---- SOLUTION ----
class DeviceAgnosticModel(nn.Module):
    """Model wrapper that handles device placement automatically."""
    def __init__(self, model):
        super().__init__()
        self.model = model
        self._device = torch.device('cpu')

    @property
    def device(self):
        """Infer device from first parameter."""
        return next(self.model.parameters()).device

    def to_best_device(self):
        """Move to best available device."""
        if torch.cuda.is_available():
            device = torch.device('cuda')
        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
            device = torch.device('mps')
        else:
            device = torch.device('cpu')
        self.model.to(device)
        self._device = device
        print(f"Model moved to {device}")
        return self

    def forward(self, x):
        # Ensure input is on same device as model
        x = x.to(self.device)
        return self.model(x)

def safe_transfer(tensor, target_device, non_blocking=True):
    """Transfer tensor to device with error handling."""
    if tensor.device == target_device:
        return tensor
    try:
        return tensor.to(target_device, non_blocking=non_blocking)
    except RuntimeError as e:
        if 'out of memory' in str(e):
            torch.cuda.empty_cache()
            return tensor.to(target_device)
        raise

def check_device_consistency(*tensors):
    """Verify all tensors are on the same device."""
    devices = [t.device for t in tensors]
    if len(set(str(d) for d in devices)) > 1:
        raise RuntimeError(
            f"Device mismatch: {[str(d) for d in devices]}. "
            f"Move all tensors to the same device."
        )
    return devices[0]

# Test
model = nn.Linear(10, 5)
wrapper = DeviceAgnosticModel(model)
x = torch.randn(3, 10)
output = wrapper(x)
print(f"Output device: {output.device}")
print(f"Output shape:  {output.shape}")

# Device consistency check
a = torch.randn(3, device='cpu')
b = torch.randn(3, device='cpu')
device = check_device_consistency(a, b)
print(f"All on: {device}")
💡
Interview tip: Always write device-agnostic code. Use tensor.to(device) and model.to(device) rather than hardcoding .cuda(). In interviews, define device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') at the top of your solution.