Advanced

Patterns & Tips

This final lesson covers the design patterns, testing strategies, and production considerations that separate good pipeline code from great pipeline code. These are the topics that come up in senior-level data engineering interviews.

Essential Pipeline Design Patterns

Every production data pipeline uses some combination of these patterns. Knowing them by name and knowing when to apply each one demonstrates senior-level thinking.

PatternUse WhenKey Benefit
Fan-Out / Fan-InProcessing can be parallelizedLinear speedup with workers
Dead Letter QueueSome records may fail processingNo data loss on errors
Idempotent WritePipeline may be retriedSafe to re-run without duplicates
Schema EvolutionSource schema changes over timePipeline does not break on new fields
BackpressureProducer is faster than consumerPrevents OOM and crashes
Circuit BreakerDownstream service may be unavailableFail fast, recover gracefully

Idempotent Pipeline Pattern

import hashlib
import json

class IdempotentPipeline:
    """A pipeline that produces the same result regardless of how many times
    it runs on the same input. Critical for reliability.

    Strategies for idempotency:
    1. UPSERT instead of INSERT (merge on natural key)
    2. Write to temp location, then atomic swap
    3. Track processed batches by ID
    """

    def __init__(self):
        self.processed_batches = set()  # Track what we have already processed
        self.output = {}                # Simulated output table (key -> record)

    def process_batch(self, batch_id, records, key_field='id'):
        """Process a batch idempotently.

        If this batch_id was already processed, skip it entirely.
        For individual records, use UPSERT semantics.
        """
        # Check if batch already processed
        if batch_id in self.processed_batches:
            return {'status': 'skipped', 'reason': 'already_processed'}

        # UPSERT each record
        inserted, updated = 0, 0
        for record in records:
            key = record[key_field]
            if key in self.output:
                updated += 1
            else:
                inserted += 1
            self.output[key] = record

        self.processed_batches.add(batch_id)
        return {
            'status': 'processed',
            'batch_id': batch_id,
            'inserted': inserted,
            'updated': updated
        }

# Demonstrate idempotency
pipeline = IdempotentPipeline()

batch = [
    {'id': 1, 'name': 'Alice', 'score': 95},
    {'id': 2, 'name': 'Bob', 'score': 87},
]

# First run
result1 = pipeline.process_batch('batch_001', batch)
print(f"Run 1: {result1}")

# Second run (retry) - should be idempotent
result2 = pipeline.process_batch('batch_001', batch)
print(f"Run 2: {result2}")

# Output should be the same regardless
print(f"Output size: {len(pipeline.output)} (should be 2, not 4)")

Dead Letter Queue Pattern

class DeadLetterPipeline:
    """Pipeline with dead letter queue for failed records.

    Never silently drop data. Route failures to a dead letter queue
    for investigation and reprocessing.
    """

    def __init__(self, max_retries=3):
        self.max_retries = max_retries
        self.dlq = []  # Dead letter queue
        self.successful = []
        self.stats = {'processed': 0, 'failed': 0, 'retried': 0}

    def process(self, records, transform_fn):
        """Process records with retry and dead letter routing."""
        for record in records:
            success = False
            last_error = None

            for attempt in range(self.max_retries + 1):
                try:
                    result = transform_fn(record)
                    self.successful.append(result)
                    self.stats['processed'] += 1
                    success = True
                    if attempt > 0:
                        self.stats['retried'] += 1
                    break
                except Exception as e:
                    last_error = str(e)

            if not success:
                self.dlq.append({
                    'record': record,
                    'error': last_error,
                    'attempts': self.max_retries + 1
                })
                self.stats['failed'] += 1

        return self.stats

# Test
def risky_transform(record):
    if record.get('amount', 0) < 0:
        raise ValueError(f"Negative amount: {record['amount']}")
    record['processed'] = True
    return record

dlq_pipeline = DeadLetterPipeline(max_retries=2)
records = [
    {'id': 1, 'amount': 100},
    {'id': 2, 'amount': -50},  # Will fail
    {'id': 3, 'amount': 200},
    {'id': 4, 'amount': -10},  # Will fail
]

stats = dlq_pipeline.process(records, risky_transform)
print(f"Stats: {stats}")
print(f"DLQ size: {len(dlq_pipeline.dlq)}")
for item in dlq_pipeline.dlq:
    print(f"  Failed record {item['record']['id']}: {item['error']}")

Testing Data Pipelines

Pipeline testing requires a different approach than application testing. Here are the four levels of testing every pipeline needs:

class PipelineTestFramework:
    """Structured approach to testing data pipelines.

    Four levels:
    1. Unit tests: Test individual transform functions
    2. Contract tests: Test input/output schema compliance
    3. Data quality tests: Test statistical properties of output
    4. Integration tests: Test end-to-end pipeline execution
    """

    @staticmethod
    def test_unit(transform_fn, input_record, expected_output):
        """Level 1: Does this function produce the expected output?"""
        result = transform_fn(input_record)
        assert result == expected_output, (
            f"Expected {expected_output}, got {result}"
        )
        return True

    @staticmethod
    def test_contract(records, schema):
        """Level 2: Does the output match the expected schema?"""
        violations = []
        for i, record in enumerate(records):
            for field, expected_type in schema.items():
                if field not in record:
                    violations.append(f"Record {i}: missing field '{field}'")
                elif not isinstance(record[field], expected_type):
                    violations.append(
                        f"Record {i}: '{field}' expected {expected_type.__name__}, "
                        f"got {type(record[field]).__name__}"
                    )
        return violations

    @staticmethod
    def test_data_quality(records, checks):
        """Level 3: Do statistical properties hold?

        Checks format: [(name, check_function, threshold)]
        """
        results = []
        for name, check_fn, threshold in checks:
            value = check_fn(records)
            passed = value >= threshold
            results.append({
                'check': name,
                'value': round(value, 4),
                'threshold': threshold,
                'passed': passed
            })
        return results

    @staticmethod
    def test_integration(pipeline_fn, test_input, expected_properties):
        """Level 4: Does the full pipeline produce valid output?"""
        output = pipeline_fn(test_input)
        results = []
        for prop_name, check_fn in expected_properties.items():
            passed = check_fn(output)
            results.append({'property': prop_name, 'passed': passed})
        return results

# Demo all four levels
tester = PipelineTestFramework()

# Level 1: Unit test
def normalize_name(record):
    r = record.copy()
    r['name'] = r['name'].strip().title()
    return r

assert tester.test_unit(
    normalize_name,
    {'name': '  alice smith  '},
    {'name': 'Alice Smith'}
)
print("Level 1 (Unit): PASSED")

# Level 2: Contract test
output_records = [
    {'name': 'Alice', 'age': 30, 'score': 95.5},
    {'name': 'Bob', 'age': '25', 'score': 87.0},  # age is string, not int
]
violations = tester.test_contract(
    output_records,
    {'name': str, 'age': int, 'score': float}
)
print(f"Level 2 (Contract): {len(violations)} violations")
for v in violations:
    print(f"  {v}")

# Level 3: Data quality
records = [{'amount': x} for x in [100, 200, 150, None, 300, 250]]
quality_checks = [
    ('completeness', lambda rs: sum(1 for r in rs if r['amount'] is not None) / len(rs), 0.8),
    ('positive_values', lambda rs: sum(1 for r in rs if r.get('amount') and r['amount'] > 0) / len(rs), 0.9),
]
quality_results = tester.test_data_quality(records, quality_checks)
print("Level 3 (Data Quality):")
for r in quality_results:
    status = 'PASS' if r['passed'] else 'FAIL'
    print(f"  [{status}] {r['check']}: {r['value']} (threshold: {r['threshold']})")

# Level 4: Integration test
def full_pipeline(data):
    return [normalize_name(r) for r in data if r.get('name')]

integration_results = tester.test_integration(
    full_pipeline,
    [{'name': 'alice'}, {'name': ''}, {'name': 'bob'}],
    {
        'non_empty_output': lambda out: len(out) > 0,
        'all_names_titled': lambda out: all(r['name'][0].isupper() for r in out),
        'no_empty_names': lambda out: all(r['name'].strip() for r in out),
    }
)
print("Level 4 (Integration):")
for r in integration_results:
    status = 'PASS' if r['passed'] else 'FAIL'
    print(f"  [{status}] {r['property']}")
💡
Testing rule: Unit tests catch logic bugs. Contract tests catch schema drift. Data quality tests catch data drift. Integration tests catch interaction bugs. You need all four levels. In interviews, mentioning this layered approach shows production experience.

Pipeline Monitoring Checklist

Every production pipeline needs these metrics. Mentioning monitoring in interviews demonstrates operational maturity.

MetricAlert WhenWhy It Matters
Pipeline latencyDuration > 2x historical averageIndicates data volume spikes or infrastructure issues
Record countCount deviates > 20% from expectedUpstream source may have changed or failed
Error rateFailures > 1% of total recordsTransform logic may need updating for new data
Data freshnessStaleness > SLA thresholdPipeline may have silently stopped
Schema changesAny new or dropped columnsUpstream schema evolution can break transforms
DLQ depthDead letter count growingPersistent errors need investigation
Resource usageMemory or CPU > 80%Pipeline may need optimization or scaling

Quick Reference Card

# ==============================================
# DATA PIPELINE INTERVIEW QUICK REFERENCE
# ==============================================

# ETL PATTERNS
# Extract:   Read from source (file, API, DB, queue)
# Transform: Clean, validate, enrich, aggregate
# Load:      Write to target (warehouse, lake, cache)

# VALIDATION CHECKS
# Schema:      Field types, required fields, format
# Range:       Min/max values, date ranges
# Referential: Foreign key integrity across tables
# Uniqueness:  Primary key deduplication
# Completeness: Null percentage thresholds

# STREAMING CONCEPTS
# Event time vs Processing time
# Watermark = max_event_time - max_lateness
# Session window = gap-based grouping
# Tumbling window = fixed, non-overlapping
# Sliding window = fixed, overlapping
# Late arrival = event_time < watermark

# PERFORMANCE TOOLKIT
# Chunking:     Process in bounded memory
# Parallelism:  Fan-out independent work
# Caching:      LRU + TTL for repeated lookups
# Generators:   Lazy evaluation for large datasets
# Bloom filter: Approximate set membership (O(1), low memory)
# External sort: Sort data larger than memory

# RELIABILITY PATTERNS
# Idempotency:    UPSERT, batch tracking, atomic swap
# Dead letter:    Route failures, don't drop data
# Circuit breaker: Fail fast on downstream outages
# Backpressure:   Slow producer when consumer is full
# Checkpointing:  Save progress for recovery

# TESTING LEVELS
# Unit:        Individual transform functions
# Contract:    Input/output schema compliance
# Data quality: Statistical property checks
# Integration: End-to-end pipeline execution

Frequently Asked Questions

ETL (Extract-Transform-Load) transforms data before loading it into the target system. Used when: the target has limited compute (e.g., traditional data warehouses), or you need to filter/clean before storage. ELT (Extract-Load-Transform) loads raw data first, then transforms using the target's compute engine. Used when: the target has powerful compute (e.g., BigQuery, Snowflake, Databricks). ELT is the modern standard because cloud warehouses make compute cheap, and keeping raw data allows re-transformation when business logic changes.

Three strategies: (1) UPSERT/MERGE: Write records using a natural key so re-running overwrites instead of duplicating. (2) Write-then-swap: Write to a temp location, verify, then atomically rename/swap with the target. This makes the entire operation all-or-nothing. (3) Batch ID tracking: Store which batch IDs have been processed. Before processing, check if the batch ID exists. If yes, skip. This is the simplest approach for scheduled pipelines.

Use batch when: latency requirements are hours (e.g., daily reports), data volume is large but bounded, or you need exact aggregates (e.g., month-end financials). Use streaming when: latency must be seconds/minutes (e.g., fraud detection, real-time recommendations), data arrives continuously, or you need to react to events immediately. Use micro-batch (Spark Structured Streaming) when: you want streaming semantics with batch simplicity, and sub-minute latency is acceptable. Most companies start with batch and add streaming for specific use cases.

Three approaches: (1) Schema-on-read: Store raw data (JSON, Avro) and apply schema at query time. Most flexible but slower queries. (2) Schema registry: Use a central registry (e.g., Confluent Schema Registry with Avro) that validates producer/consumer compatibility. Supports forward and backward compatibility rules. (3) Defensive coding: Use .get(field, default) instead of direct access, validate schema at pipeline entry, and alert on new or dropped fields. In interviews, mention that Avro and Protobuf support schema evolution natively, while JSON requires manual handling.

A feature store is a centralized system that manages the lifecycle of ML features: computation, storage, serving, and monitoring. It solves three problems: (1) Training-serving skew: Features computed differently in training vs production cause model degradation. A feature store computes features once and serves them consistently. (2) Feature reuse: Without a feature store, teams recompute the same features independently, wasting compute and creating inconsistencies. (3) Point-in-time correctness: For time-series features, you need to compute features as of the label timestamp, not the current time, to avoid data leakage. Tools like Feast, Tecton, and Hopsworks provide these capabilities.

Use four levels of testing: (1) Unit tests for individual transform functions with known input/output. (2) Contract tests that verify output schema matches expectations (field names, types, nullability). (3) Data quality tests that check statistical properties: row counts within expected range, null percentages below threshold, value distributions stable. Use tools like Great Expectations or dbt tests. (4) Integration tests that run the full pipeline on a small dataset and verify end-to-end correctness. Run unit and contract tests on every PR, quality tests on every pipeline run, and integration tests nightly.

Focus on these areas: (1) Core patterns: Be able to implement ETL, validation, and streaming solutions from scratch in Python. Practice the challenges in this course. (2) System design: Know when to use batch vs streaming, how to handle schema evolution, and how to design for reliability (idempotency, dead letter queues). (3) Tools awareness: Know what Airflow, Spark, Kafka, dbt, and Great Expectations do, even if you have not used all of them. (4) Production thinking: Always mention monitoring, testing, error handling, and scalability unprompted. This is what separates senior from mid-level candidates. Practice 2-3 pipeline problems per day for 2 weeks before your interview.

📝
Final advice: The best data engineers think about pipelines as products, not scripts. They consider reliability, monitoring, testing, and evolution from the start. In interviews, demonstrating this mindset matters as much as writing correct code. Practice building complete, production-ready pipeline components, and you will stand out. Good luck with your interviews.