Patterns & Tips
This final lesson covers the design patterns, testing strategies, and production considerations that separate good pipeline code from great pipeline code. These are the topics that come up in senior-level data engineering interviews.
Essential Pipeline Design Patterns
Every production data pipeline uses some combination of these patterns. Knowing them by name and knowing when to apply each one demonstrates senior-level thinking.
| Pattern | Use When | Key Benefit |
|---|---|---|
| Fan-Out / Fan-In | Processing can be parallelized | Linear speedup with workers |
| Dead Letter Queue | Some records may fail processing | No data loss on errors |
| Idempotent Write | Pipeline may be retried | Safe to re-run without duplicates |
| Schema Evolution | Source schema changes over time | Pipeline does not break on new fields |
| Backpressure | Producer is faster than consumer | Prevents OOM and crashes |
| Circuit Breaker | Downstream service may be unavailable | Fail fast, recover gracefully |
Idempotent Pipeline Pattern
import hashlib
import json
class IdempotentPipeline:
"""A pipeline that produces the same result regardless of how many times
it runs on the same input. Critical for reliability.
Strategies for idempotency:
1. UPSERT instead of INSERT (merge on natural key)
2. Write to temp location, then atomic swap
3. Track processed batches by ID
"""
def __init__(self):
self.processed_batches = set() # Track what we have already processed
self.output = {} # Simulated output table (key -> record)
def process_batch(self, batch_id, records, key_field='id'):
"""Process a batch idempotently.
If this batch_id was already processed, skip it entirely.
For individual records, use UPSERT semantics.
"""
# Check if batch already processed
if batch_id in self.processed_batches:
return {'status': 'skipped', 'reason': 'already_processed'}
# UPSERT each record
inserted, updated = 0, 0
for record in records:
key = record[key_field]
if key in self.output:
updated += 1
else:
inserted += 1
self.output[key] = record
self.processed_batches.add(batch_id)
return {
'status': 'processed',
'batch_id': batch_id,
'inserted': inserted,
'updated': updated
}
# Demonstrate idempotency
pipeline = IdempotentPipeline()
batch = [
{'id': 1, 'name': 'Alice', 'score': 95},
{'id': 2, 'name': 'Bob', 'score': 87},
]
# First run
result1 = pipeline.process_batch('batch_001', batch)
print(f"Run 1: {result1}")
# Second run (retry) - should be idempotent
result2 = pipeline.process_batch('batch_001', batch)
print(f"Run 2: {result2}")
# Output should be the same regardless
print(f"Output size: {len(pipeline.output)} (should be 2, not 4)")
Dead Letter Queue Pattern
class DeadLetterPipeline:
"""Pipeline with dead letter queue for failed records.
Never silently drop data. Route failures to a dead letter queue
for investigation and reprocessing.
"""
def __init__(self, max_retries=3):
self.max_retries = max_retries
self.dlq = [] # Dead letter queue
self.successful = []
self.stats = {'processed': 0, 'failed': 0, 'retried': 0}
def process(self, records, transform_fn):
"""Process records with retry and dead letter routing."""
for record in records:
success = False
last_error = None
for attempt in range(self.max_retries + 1):
try:
result = transform_fn(record)
self.successful.append(result)
self.stats['processed'] += 1
success = True
if attempt > 0:
self.stats['retried'] += 1
break
except Exception as e:
last_error = str(e)
if not success:
self.dlq.append({
'record': record,
'error': last_error,
'attempts': self.max_retries + 1
})
self.stats['failed'] += 1
return self.stats
# Test
def risky_transform(record):
if record.get('amount', 0) < 0:
raise ValueError(f"Negative amount: {record['amount']}")
record['processed'] = True
return record
dlq_pipeline = DeadLetterPipeline(max_retries=2)
records = [
{'id': 1, 'amount': 100},
{'id': 2, 'amount': -50}, # Will fail
{'id': 3, 'amount': 200},
{'id': 4, 'amount': -10}, # Will fail
]
stats = dlq_pipeline.process(records, risky_transform)
print(f"Stats: {stats}")
print(f"DLQ size: {len(dlq_pipeline.dlq)}")
for item in dlq_pipeline.dlq:
print(f" Failed record {item['record']['id']}: {item['error']}")
Testing Data Pipelines
Pipeline testing requires a different approach than application testing. Here are the four levels of testing every pipeline needs:
class PipelineTestFramework:
"""Structured approach to testing data pipelines.
Four levels:
1. Unit tests: Test individual transform functions
2. Contract tests: Test input/output schema compliance
3. Data quality tests: Test statistical properties of output
4. Integration tests: Test end-to-end pipeline execution
"""
@staticmethod
def test_unit(transform_fn, input_record, expected_output):
"""Level 1: Does this function produce the expected output?"""
result = transform_fn(input_record)
assert result == expected_output, (
f"Expected {expected_output}, got {result}"
)
return True
@staticmethod
def test_contract(records, schema):
"""Level 2: Does the output match the expected schema?"""
violations = []
for i, record in enumerate(records):
for field, expected_type in schema.items():
if field not in record:
violations.append(f"Record {i}: missing field '{field}'")
elif not isinstance(record[field], expected_type):
violations.append(
f"Record {i}: '{field}' expected {expected_type.__name__}, "
f"got {type(record[field]).__name__}"
)
return violations
@staticmethod
def test_data_quality(records, checks):
"""Level 3: Do statistical properties hold?
Checks format: [(name, check_function, threshold)]
"""
results = []
for name, check_fn, threshold in checks:
value = check_fn(records)
passed = value >= threshold
results.append({
'check': name,
'value': round(value, 4),
'threshold': threshold,
'passed': passed
})
return results
@staticmethod
def test_integration(pipeline_fn, test_input, expected_properties):
"""Level 4: Does the full pipeline produce valid output?"""
output = pipeline_fn(test_input)
results = []
for prop_name, check_fn in expected_properties.items():
passed = check_fn(output)
results.append({'property': prop_name, 'passed': passed})
return results
# Demo all four levels
tester = PipelineTestFramework()
# Level 1: Unit test
def normalize_name(record):
r = record.copy()
r['name'] = r['name'].strip().title()
return r
assert tester.test_unit(
normalize_name,
{'name': ' alice smith '},
{'name': 'Alice Smith'}
)
print("Level 1 (Unit): PASSED")
# Level 2: Contract test
output_records = [
{'name': 'Alice', 'age': 30, 'score': 95.5},
{'name': 'Bob', 'age': '25', 'score': 87.0}, # age is string, not int
]
violations = tester.test_contract(
output_records,
{'name': str, 'age': int, 'score': float}
)
print(f"Level 2 (Contract): {len(violations)} violations")
for v in violations:
print(f" {v}")
# Level 3: Data quality
records = [{'amount': x} for x in [100, 200, 150, None, 300, 250]]
quality_checks = [
('completeness', lambda rs: sum(1 for r in rs if r['amount'] is not None) / len(rs), 0.8),
('positive_values', lambda rs: sum(1 for r in rs if r.get('amount') and r['amount'] > 0) / len(rs), 0.9),
]
quality_results = tester.test_data_quality(records, quality_checks)
print("Level 3 (Data Quality):")
for r in quality_results:
status = 'PASS' if r['passed'] else 'FAIL'
print(f" [{status}] {r['check']}: {r['value']} (threshold: {r['threshold']})")
# Level 4: Integration test
def full_pipeline(data):
return [normalize_name(r) for r in data if r.get('name')]
integration_results = tester.test_integration(
full_pipeline,
[{'name': 'alice'}, {'name': ''}, {'name': 'bob'}],
{
'non_empty_output': lambda out: len(out) > 0,
'all_names_titled': lambda out: all(r['name'][0].isupper() for r in out),
'no_empty_names': lambda out: all(r['name'].strip() for r in out),
}
)
print("Level 4 (Integration):")
for r in integration_results:
status = 'PASS' if r['passed'] else 'FAIL'
print(f" [{status}] {r['property']}")
Pipeline Monitoring Checklist
Every production pipeline needs these metrics. Mentioning monitoring in interviews demonstrates operational maturity.
| Metric | Alert When | Why It Matters |
|---|---|---|
| Pipeline latency | Duration > 2x historical average | Indicates data volume spikes or infrastructure issues |
| Record count | Count deviates > 20% from expected | Upstream source may have changed or failed |
| Error rate | Failures > 1% of total records | Transform logic may need updating for new data |
| Data freshness | Staleness > SLA threshold | Pipeline may have silently stopped |
| Schema changes | Any new or dropped columns | Upstream schema evolution can break transforms |
| DLQ depth | Dead letter count growing | Persistent errors need investigation |
| Resource usage | Memory or CPU > 80% | Pipeline may need optimization or scaling |
Quick Reference Card
# ==============================================
# DATA PIPELINE INTERVIEW QUICK REFERENCE
# ==============================================
# ETL PATTERNS
# Extract: Read from source (file, API, DB, queue)
# Transform: Clean, validate, enrich, aggregate
# Load: Write to target (warehouse, lake, cache)
# VALIDATION CHECKS
# Schema: Field types, required fields, format
# Range: Min/max values, date ranges
# Referential: Foreign key integrity across tables
# Uniqueness: Primary key deduplication
# Completeness: Null percentage thresholds
# STREAMING CONCEPTS
# Event time vs Processing time
# Watermark = max_event_time - max_lateness
# Session window = gap-based grouping
# Tumbling window = fixed, non-overlapping
# Sliding window = fixed, overlapping
# Late arrival = event_time < watermark
# PERFORMANCE TOOLKIT
# Chunking: Process in bounded memory
# Parallelism: Fan-out independent work
# Caching: LRU + TTL for repeated lookups
# Generators: Lazy evaluation for large datasets
# Bloom filter: Approximate set membership (O(1), low memory)
# External sort: Sort data larger than memory
# RELIABILITY PATTERNS
# Idempotency: UPSERT, batch tracking, atomic swap
# Dead letter: Route failures, don't drop data
# Circuit breaker: Fail fast on downstream outages
# Backpressure: Slow producer when consumer is full
# Checkpointing: Save progress for recovery
# TESTING LEVELS
# Unit: Individual transform functions
# Contract: Input/output schema compliance
# Data quality: Statistical property checks
# Integration: End-to-end pipeline execution
Frequently Asked Questions
ETL (Extract-Transform-Load) transforms data before loading it into the target system. Used when: the target has limited compute (e.g., traditional data warehouses), or you need to filter/clean before storage. ELT (Extract-Load-Transform) loads raw data first, then transforms using the target's compute engine. Used when: the target has powerful compute (e.g., BigQuery, Snowflake, Databricks). ELT is the modern standard because cloud warehouses make compute cheap, and keeping raw data allows re-transformation when business logic changes.
Three strategies: (1) UPSERT/MERGE: Write records using a natural key so re-running overwrites instead of duplicating. (2) Write-then-swap: Write to a temp location, verify, then atomically rename/swap with the target. This makes the entire operation all-or-nothing. (3) Batch ID tracking: Store which batch IDs have been processed. Before processing, check if the batch ID exists. If yes, skip. This is the simplest approach for scheduled pipelines.
Use batch when: latency requirements are hours (e.g., daily reports), data volume is large but bounded, or you need exact aggregates (e.g., month-end financials). Use streaming when: latency must be seconds/minutes (e.g., fraud detection, real-time recommendations), data arrives continuously, or you need to react to events immediately. Use micro-batch (Spark Structured Streaming) when: you want streaming semantics with batch simplicity, and sub-minute latency is acceptable. Most companies start with batch and add streaming for specific use cases.
Three approaches: (1) Schema-on-read: Store raw data (JSON, Avro) and apply schema at query time. Most flexible but slower queries. (2) Schema registry: Use a central registry (e.g., Confluent Schema Registry with Avro) that validates producer/consumer compatibility. Supports forward and backward compatibility rules. (3) Defensive coding: Use .get(field, default) instead of direct access, validate schema at pipeline entry, and alert on new or dropped fields. In interviews, mention that Avro and Protobuf support schema evolution natively, while JSON requires manual handling.
A feature store is a centralized system that manages the lifecycle of ML features: computation, storage, serving, and monitoring. It solves three problems: (1) Training-serving skew: Features computed differently in training vs production cause model degradation. A feature store computes features once and serves them consistently. (2) Feature reuse: Without a feature store, teams recompute the same features independently, wasting compute and creating inconsistencies. (3) Point-in-time correctness: For time-series features, you need to compute features as of the label timestamp, not the current time, to avoid data leakage. Tools like Feast, Tecton, and Hopsworks provide these capabilities.
Use four levels of testing: (1) Unit tests for individual transform functions with known input/output. (2) Contract tests that verify output schema matches expectations (field names, types, nullability). (3) Data quality tests that check statistical properties: row counts within expected range, null percentages below threshold, value distributions stable. Use tools like Great Expectations or dbt tests. (4) Integration tests that run the full pipeline on a small dataset and verify end-to-end correctness. Run unit and contract tests on every PR, quality tests on every pipeline run, and integration tests nightly.
Focus on these areas: (1) Core patterns: Be able to implement ETL, validation, and streaming solutions from scratch in Python. Practice the challenges in this course. (2) System design: Know when to use batch vs streaming, how to handle schema evolution, and how to design for reliability (idempotency, dead letter queues). (3) Tools awareness: Know what Airflow, Spark, Kafka, dbt, and Great Expectations do, even if you have not used all of them. (4) Production thinking: Always mention monitoring, testing, error handling, and scalability unprompted. This is what separates senior from mid-level candidates. Practice 2-3 pipeline problems per day for 2 weeks before your interview.
Lilly Tech Systems