Data Manipulation Puzzles
10 tricky data manipulation problems that interviewers use to test your real-world data engineering skills. These are the messy, ambiguous problems you face in production ML pipelines — not textbook exercises.
Puzzle 1: Intelligent Deduplication
import pandas as pd
import numpy as np
df = pd.DataFrame({
'name': ['Alice Smith', 'alice smith', 'ALICE SMITH', 'Bob Jones', 'Bob Jones'],
'email': ['alice@gmail.com', 'Alice@Gmail.com', 'alice@gmail.com ', 'bob@yahoo.com', 'bob@yahoo.com'],
'phone': ['555-1234', None, '5551234', '555-5678', '555-5678'],
'address': [None, '123 Main St', '123 Main St', '456 Oak Ave', None],
'signup_date': pd.to_datetime(['2024-01-01', '2024-02-15', '2024-03-10', '2024-01-05', '2024-06-20'])
})
def intelligent_dedup(df):
"""Deduplicate by normalized key, keeping most complete record."""
df = df.copy()
# Normalize matching keys
df['_norm_email'] = df['email'].str.strip().str.lower()
df['_norm_name'] = df['name'].str.strip().str.lower()
# Count non-null fields per row (completeness score)
data_cols = [c for c in df.columns if not c.startswith('_')]
df['_completeness'] = df[data_cols].notna().sum(axis=1)
# Sort by completeness (descending) then recency (descending)
df = df.sort_values(['_completeness', 'signup_date'],
ascending=[False, False])
# Keep first (most complete) record per normalized email
deduped = df.drop_duplicates(subset='_norm_email', keep='first')
# Clean up helper columns
deduped = deduped.drop(columns=['_norm_email', '_norm_name', '_completeness'])
return deduped.reset_index(drop=True)
result = intelligent_dedup(df)
print(result)
Interviewer focus: Normalizing keys before dedup (lowercase, strip), using a completeness score to choose the best record, and handling the common real-world scenario where duplicates have complementary information. Mention that in production you might also merge fields from duplicates rather than just picking one.
Puzzle 2: Sessionization from Event Logs
import pandas as pd
import numpy as np
df = pd.DataFrame({
'user_id': ['A','A','A','A','A','A','B','B','B','B'],
'timestamp': pd.to_datetime([
'2024-01-01 10:00', '2024-01-01 10:05', '2024-01-01 10:20',
'2024-01-01 14:00', '2024-01-01 14:10', '2024-01-01 14:25',
'2024-01-01 09:00', '2024-01-01 09:15', '2024-01-01 12:00',
'2024-01-01 12:05'
]),
'page': ['home','search','product','home','cart','checkout',
'home','search','home','product']
})
def sessionize(df, gap_minutes=30):
"""Assign session IDs based on inactivity gaps."""
df = df.sort_values(['user_id', 'timestamp']).copy()
# Time difference from previous event (per user)
df['time_diff'] = df.groupby('user_id')['timestamp'].diff()
# New session flag: first event or gap > threshold
gap_threshold = pd.Timedelta(minutes=gap_minutes)
df['new_session'] = (
df['time_diff'].isna() | # First event per user
(df['time_diff'] > gap_threshold) # Gap exceeds threshold
).astype(int)
# Cumulative sum of new_session flags = session ID
df['session_id'] = df.groupby('user_id')['new_session'].cumsum()
# Create global unique session ID
df['global_session'] = df['user_id'] + '_S' + df['session_id'].astype(str)
df = df.drop(columns=['time_diff', 'new_session'])
return df
result = sessionize(df)
print(result[['user_id', 'timestamp', 'page', 'session_id', 'global_session']])
# Session summary
summary = result.groupby(['user_id', 'session_id']).agg(
start_time=('timestamp', 'min'),
end_time=('timestamp', 'max'),
page_count=('page', 'count'),
pages=('page', list)
).reset_index()
print("\nSession Summary:")
print(summary)
Interviewer focus: The diff() + cumsum() pattern for sessionization is a classic time series technique. It avoids explicit loops and handles multiple users correctly via groupby. This exact pattern is used at every analytics company.
Puzzle 3: Funnel Analysis
import pandas as pd
import numpy as np
np.random.seed(42)
events = pd.DataFrame({
'user_id': np.repeat(range(1, 101), 3),
'event_type': ['visit', 'signup', 'purchase'] * 100,
'timestamp': pd.date_range('2024-01-01', periods=300, freq='h')
})
# Randomly remove some events to simulate drop-offs
drop_mask = np.random.random(300) > 0.3
events = events[drop_mask].reset_index(drop=True)
def funnel_analysis(events, funnel_steps):
"""Compute conversion rates through an ordered funnel.
Args:
events: DataFrame with user_id, event_type, timestamp.
funnel_steps: List of event types in funnel order.
Returns:
DataFrame with step, users, conversion rate.
"""
# Get first occurrence of each event type per user
first_events = (
events.sort_values('timestamp')
.groupby(['user_id', 'event_type'])['timestamp']
.first()
.unstack()
)
results = []
qualified_users = set(first_events.index)
for i, step in enumerate(funnel_steps):
if step not in first_events.columns:
results.append({'step': step, 'users': 0, 'conversion': 0})
continue
# Users who completed this step
step_users = set(first_events[first_events[step].notna()].index)
if i > 0:
prev_step = funnel_steps[i - 1]
prev_users = set(first_events[first_events[prev_step].notna()].index)
# Must have completed previous step BEFORE this step
step_users = {
u for u in step_users & prev_users
if first_events.loc[u, step] >= first_events.loc[u, prev_step]
}
qualified_users = qualified_users & step_users
count = len(qualified_users)
results.append({
'step': step,
'users': count,
'conversion': count / len(first_events) * 100
})
result_df = pd.DataFrame(results)
result_df['step_conversion'] = (
result_df['users'] / result_df['users'].shift(1) * 100
).fillna(100)
return result_df
funnel = funnel_analysis(events, ['visit', 'signup', 'purchase'])
print(funnel)
Interviewer focus: Ensuring temporal ordering (signup must come after visit), handling users who skip steps, and computing both overall conversion and step-to-step conversion rates. This is a core product analytics question.
Puzzle 4: Rolling Window with Lookback Constraint
import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
'user_id': np.random.choice(['U1','U2','U3'], 30),
'date': pd.date_range('2024-01-01', periods=30, freq='D'),
'amount': np.random.randint(10, 500, 30)
})
def rolling_lookback_feature(df, window_days=30):
"""Compute rolling average excluding current row (no leakage).
For each transaction, computes the average amount of all previous
transactions by the same user within the lookback window.
"""
df = df.sort_values(['user_id', 'date']).copy()
results = []
for _, group in df.groupby('user_id'):
group = group.copy()
avg_amounts = []
for idx, row in group.iterrows():
# All previous transactions within window
mask = (
(group['date'] < row['date']) & # Strictly before (no leakage)
(group['date'] >= row['date'] - pd.Timedelta(days=window_days))
)
prior = group.loc[mask, 'amount']
if len(prior) == 0:
avg_amounts.append(np.nan) # No history
else:
avg_amounts.append(prior.mean())
group['avg_amount_30d'] = avg_amounts
results.append(group)
return pd.concat(results).sort_values('date').reset_index(drop=True)
# Vectorized approach using merge_asof (much faster for large data)
def rolling_lookback_fast(df, window_days=30):
"""Efficient version using expanding window trick."""
df = df.sort_values(['user_id', 'date']).copy()
# Cumulative sum and count, then subtract current row
df['cum_sum'] = df.groupby('user_id')['amount'].cumsum() - df['amount']
df['cum_count'] = df.groupby('user_id').cumcount() # 0-indexed = prior count
# This gives the expanding average (all prior), not windowed
df['avg_amount_expanding'] = np.where(
df['cum_count'] > 0,
df['cum_sum'] / df['cum_count'],
np.nan
)
df = df.drop(columns=['cum_sum', 'cum_count'])
return df
result = rolling_lookback_fast(df)
print(result[['user_id', 'date', 'amount', 'avg_amount_expanding']].head(15))
Interviewer focus: The strict < (not <=) to exclude the current row is the key leakage prevention detail. The fast version using cumsum shows performance awareness. Mention that for exact windowed (not expanding) averages at scale, you would use a database window function or sort-based approach.
Puzzle 5: Conditional Merge with Priority Rules
import pandas as pd
import numpy as np
customers = pd.DataFrame({
'customer_id': range(1, 8),
'product': ['Laptop', 'Phone', 'Tablet', 'Laptop', 'Camera', 'Phone', 'Monitor'],
'category': ['Electronics', 'Electronics', 'Electronics', 'Electronics',
'Electronics', 'Electronics', 'Electronics']
})
coupons = pd.DataFrame({
'coupon_id': ['C1', 'C2', 'C3', 'C4', 'C5'],
'type': ['product', 'product', 'category', 'store', 'product'],
'match_value': ['Laptop', 'Phone', 'Electronics', '*', 'Camera'],
'discount_pct': [20, 15, 10, 5, 25]
})
def priority_coupon_match(customers, coupons):
"""Match customers to coupons with priority: product > category > store."""
# Priority 1: Product-level coupons
product_coupons = coupons[coupons['type'] == 'product']
match1 = customers.merge(
product_coupons,
left_on='product', right_on='match_value',
how='left', suffixes=('', '_coupon')
)
# Priority 2: Category-level coupons (only for unmatched)
unmatched_mask = match1['coupon_id'].isna()
category_coupons = coupons[coupons['type'] == 'category']
if unmatched_mask.any() and len(category_coupons) > 0:
unmatched = customers[unmatched_mask.values].merge(
category_coupons,
left_on='category', right_on='match_value',
how='left', suffixes=('', '_coupon')
)
# Fill in category matches
for idx in unmatched.index:
if pd.notna(unmatched.loc[idx, 'coupon_id']):
orig_idx = match1.index[unmatched_mask][
match1[unmatched_mask]['customer_id'] == unmatched.loc[idx, 'customer_id']
][0]
match1.loc[orig_idx, 'coupon_id'] = unmatched.loc[idx, 'coupon_id']
match1.loc[orig_idx, 'discount_pct'] = unmatched.loc[idx, 'discount_pct']
# Priority 3: Store-wide coupons (for remaining unmatched)
still_unmatched = match1['coupon_id'].isna()
store_coupons = coupons[coupons['type'] == 'store']
if still_unmatched.any() and len(store_coupons) > 0:
best_store = store_coupons.sort_values('discount_pct', ascending=False).iloc[0]
match1.loc[still_unmatched, 'coupon_id'] = best_store['coupon_id']
match1.loc[still_unmatched, 'discount_pct'] = best_store['discount_pct']
result = match1[['customer_id', 'product', 'coupon_id', 'discount_pct']].copy()
result['match_type'] = np.where(
result['coupon_id'].isna(), 'none',
np.where(result['discount_pct'] >= 15, 'product',
np.where(result['discount_pct'] >= 10, 'category', 'store'))
)
return result
result = priority_coupon_match(customers, coupons)
print(result)
Interviewer focus: Cascading merge logic with priority ordering is a real business problem. The key insight is handling it in layers rather than one complex join. In production, mention that you would use SQL COALESCE with ordered LEFT JOINs.
Puzzle 6: Gap and Island Detection
import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=50, freq='h'),
'value': np.cumsum(np.random.randn(50)) + 5
})
def find_islands(df, threshold=5.0, value_col='value', time_col='timestamp'):
"""Find continuous periods where value exceeds threshold.
Returns DataFrame with start, end, duration, and stats for each island.
"""
df = df.copy()
# Flag: above or below threshold
df['above'] = (df[value_col] > threshold).astype(int)
# Detect transitions (0->1 or 1->0)
df['change'] = df['above'].diff().fillna(df['above']).abs()
# Assign group IDs: cumulative sum of changes creates unique groups
df['group'] = df['above'].diff().ne(0).cumsum()
# Filter only "above threshold" groups
islands = df[df['above'] == 1].groupby('group').agg(
start=(time_col, 'first'),
end=(time_col, 'last'),
duration_hours=(time_col, lambda x: (x.max() - x.min()).total_seconds() / 3600),
readings=('above', 'count'),
mean_value=(value_col, 'mean'),
max_value=(value_col, 'max')
).reset_index(drop=True)
return islands
islands = find_islands(df, threshold=5.0)
print("Islands (above threshold):")
print(islands)
# Also find gaps
gaps = find_islands(df.assign(value=-df['value']), threshold=-5.0)
print("\nGaps (below threshold):")
print(gaps)
Interviewer focus: The diff().ne(0).cumsum() pattern is the standard "gap and island" technique from SQL, translated to Pandas. This pattern is used extensively in anomaly detection, outage tracking, and session analysis.
Puzzle 7: Self-Join for Sequence Detection
import pandas as pd
import numpy as np
events = pd.DataFrame({
'user_id': ['A','A','A','A','B','B','B','C','C','C','C'],
'action': ['search','view','purchase','search',
'search','view','purchase',
'search','view','search','view'],
'product': ['P1','P1','P1','P2','P1','P1','P1','P1','P1','P2','P2'],
'timestamp': pd.to_datetime([
'2024-01-01 10:00','2024-01-01 10:05','2024-01-01 10:30','2024-01-01 11:00',
'2024-01-01 09:00','2024-01-01 09:08','2024-01-01 12:00',
'2024-01-01 14:00','2024-01-01 14:03','2024-01-01 15:00','2024-01-01 15:02'
])
})
def find_action_sequences(events, max_search_view_min=10, max_view_purchase_min=60):
"""Find users who completed search -> view -> purchase sequence."""
searches = events[events['action'] == 'search'][['user_id', 'product', 'timestamp']].rename(
columns={'timestamp': 'search_time'})
views = events[events['action'] == 'view'][['user_id', 'product', 'timestamp']].rename(
columns={'timestamp': 'view_time'})
purchases = events[events['action'] == 'purchase'][['user_id', 'product', 'timestamp']].rename(
columns={'timestamp': 'purchase_time'})
# Join search -> view (same user + product)
sv = searches.merge(views, on=['user_id', 'product'])
sv['search_to_view_min'] = (sv['view_time'] - sv['search_time']).dt.total_seconds() / 60
# Filter: view must be after search, within threshold
sv = sv[
(sv['search_to_view_min'] > 0) &
(sv['search_to_view_min'] <= max_search_view_min)
]
# Join view -> purchase
svp = sv.merge(purchases, on=['user_id', 'product'])
svp['view_to_purchase_min'] = (svp['purchase_time'] - svp['view_time']).dt.total_seconds() / 60
# Filter: purchase must be after view, within threshold
svp = svp[
(svp['view_to_purchase_min'] > 0) &
(svp['view_to_purchase_min'] <= max_view_purchase_min)
]
return svp[['user_id', 'product', 'search_time', 'view_time',
'purchase_time', 'search_to_view_min', 'view_to_purchase_min']]
result = find_action_sequences(events)
print(result)
# User A, Product P1: search 10:00 -> view 10:05 -> purchase 10:30 (matches)
# User B: view-to-purchase gap too long (3 hours)
# User C: no purchase
Interviewer focus: Using self-joins with temporal constraints is a common pattern in behavioral analytics. The key is enforcing the ordering constraint (view after search, purchase after view) and the time window constraints. Mention that for large datasets, you would use merge_asof or database window functions.
Puzzle 8: Cohort Retention Analysis
import pandas as pd
import numpy as np
np.random.seed(42)
# Generate user signups and activity
users = pd.DataFrame({
'user_id': range(1, 201),
'signup_date': np.random.choice(
pd.date_range('2024-01-01', '2024-06-01', freq='MS'), 200
)
})
# Activity: each user has some random active days
activity = []
for _, user in users.iterrows():
n_active = np.random.randint(1, 30)
active_dates = pd.date_range(
user['signup_date'],
user['signup_date'] + pd.Timedelta(days=180),
periods=n_active
)
for d in active_dates:
activity.append({'user_id': user['user_id'], 'activity_date': d})
activity_df = pd.DataFrame(activity)
def cohort_retention(users, activity, periods=6):
"""Build a cohort retention table."""
# Assign cohort (signup month)
users = users.copy()
users['cohort'] = users['signup_date'].dt.to_period('M')
# Merge activity with cohort
merged = activity.merge(users[['user_id', 'cohort']], on='user_id')
merged['activity_period'] = merged['activity_date'].dt.to_period('M')
# Compute period offset (months since signup)
merged['period_offset'] = (
merged['activity_period'].astype(int) - merged['cohort'].astype(int)
)
# Count unique active users per cohort per period
retention = (
merged[merged['period_offset'] >= 0]
.groupby(['cohort', 'period_offset'])['user_id']
.nunique()
.unstack(fill_value=0)
)
# Cohort sizes
cohort_sizes = users.groupby('cohort')['user_id'].nunique()
# Convert to retention percentages
retention_pct = retention.div(cohort_sizes, axis=0) * 100
# Limit to requested periods
retention_pct = retention_pct.iloc[:, :periods+1]
retention_pct.columns = [f"Month {i}" for i in retention_pct.columns]
return retention_pct.round(1)
retention_table = cohort_retention(users, activity_df)
print("Cohort Retention (%):")
print(retention_table)
Interviewer focus: Using Period types for month arithmetic, unstack for pivoting, and div for broadcasting division. Cohort retention is one of the most common analytics interview questions. The clean separation of data preparation and computation is a strong signal.
Puzzle 9: Memory-Efficient Large File Processing
import pandas as pd
import numpy as np
# Simulate writing a large CSV
large_df = pd.DataFrame({
'category': np.random.choice(['A','B','C','D','E'], 10000),
'value1': np.random.randn(10000) * 100,
'value2': np.random.randint(0, 1000, 10000)
})
large_df.to_csv('/tmp/large_data.csv', index=False)
def chunked_stats(filepath, chunk_size=1000, group_col='category', value_col='value1'):
"""Compute grouped statistics using chunked reading.
Uses Welford's online algorithm for numerically stable
variance computation across chunks.
"""
stats = {} # {category: {'n': count, 'mean': running_mean, 'M2': running_var}}
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
for category, group in chunk.groupby(group_col):
values = group[value_col].values
if category not in stats:
stats[category] = {'n': 0, 'mean': 0.0, 'M2': 0.0}
# Welford's online algorithm (handles chunks correctly)
for x in values:
stats[category]['n'] += 1
n = stats[category]['n']
delta = x - stats[category]['mean']
stats[category]['mean'] += delta / n
delta2 = x - stats[category]['mean']
stats[category]['M2'] += delta * delta2
# Convert to final statistics
results = []
for category, s in stats.items():
std = np.sqrt(s['M2'] / s['n']) if s['n'] > 1 else 0
results.append({
'category': category,
'count': s['n'],
'mean': round(s['mean'], 4),
'std': round(std, 4)
})
return pd.DataFrame(results).sort_values('category').reset_index(drop=True)
# Chunked approach
chunked_result = chunked_stats('/tmp/large_data.csv', chunk_size=1000)
print("Chunked stats:")
print(chunked_result)
# Verify against full load
full_result = large_df.groupby('category')['value1'].agg(['count','mean','std']).reset_index()
print("\nFull load stats (verification):")
print(full_result.round(4))
Interviewer focus: Using pd.read_csv(chunksize=...) for memory-constrained processing. Welford’s algorithm for numerically stable online variance is the advanced detail that impresses. In practice, mention alternatives: Dask, Polars, or database aggregation.
Puzzle 10: Pandas Performance Optimization
import pandas as pd
import numpy as np
import time
n = 100_000
df = pd.DataFrame({
'id': range(n),
'category': np.random.choice(['cat_a', 'cat_b', 'cat_c', 'cat_d'], n),
'value': np.random.randn(n) * 100,
'flag': np.random.choice([True, False], n),
'score': np.random.randint(0, 100, n)
})
# ===== OPTIMIZATION 1: Dtype downcasting =====
def optimize_dtypes(df):
"""Reduce memory by downcasting numeric types."""
df_opt = df.copy()
# Downcast integers
for col in df_opt.select_dtypes(include='int').columns:
df_opt[col] = pd.to_numeric(df_opt[col], downcast='integer')
# Downcast floats
for col in df_opt.select_dtypes(include='float').columns:
df_opt[col] = pd.to_numeric(df_opt[col], downcast='float')
# Convert string columns with low cardinality to categorical
for col in df_opt.select_dtypes(include='object').columns:
if df_opt[col].nunique() / len(df_opt) < 0.5: # Low cardinality
df_opt[col] = df_opt[col].astype('category')
return df_opt
before_mb = df.memory_usage(deep=True).sum() / 1e6
df_opt = optimize_dtypes(df)
after_mb = df_opt.memory_usage(deep=True).sum() / 1e6
print(f"Memory: {before_mb:.2f} MB -> {after_mb:.2f} MB "
f"({(1 - after_mb/before_mb)*100:.0f}% reduction)")
# ===== OPTIMIZATION 2: Vectorized vs apply =====
# SLOW
start = time.time()
result_slow = df['value'].apply(lambda x: 'high' if x > 50 else ('low' if x < -50 else 'mid'))
slow_time = time.time() - start
# FAST
start = time.time()
conditions = [df['value'] > 50, df['value'] < -50]
choices = ['high', 'low']
result_fast = np.select(conditions, choices, default='mid')
fast_time = time.time() - start
print(f"\napply: {slow_time:.4f}s, np.select: {fast_time:.4f}s, "
f"speedup: {slow_time/fast_time:.0f}x")
# ===== OPTIMIZATION 3: query() vs boolean indexing =====
# Both are fast, but query is more readable for complex conditions
start = time.time()
r1 = df[(df['value'] > 0) & (df['score'] > 50) & (df['flag'] == True)]
t1 = time.time() - start
start = time.time()
r2 = df.query('value > 0 and score > 50 and flag == True')
t2 = time.time() - start
print(f"\nBoolean indexing: {t1:.4f}s, query(): {t2:.4f}s")
# ===== OPTIMIZATION 4: Avoid chained operations =====
# BAD: creates intermediate copies
# df_bad = df[df['value'] > 0]
# df_bad = df_bad[df_bad['score'] > 50] # SettingWithCopyWarning
# GOOD: single operation
df_good = df.loc[(df['value'] > 0) & (df['score'] > 50)]
print(f"\nOptimized DataFrame shape: {df_good.shape}")
Interviewer focus: Knowing that category dtype saves memory for low-cardinality strings, np.select is orders of magnitude faster than apply, and chained indexing causes SettingWithCopyWarning. These are the performance patterns that distinguish senior from junior data engineers.
Lilly Tech Systems