Intermediate

Feature Pipelines with Kafka

Build real-time feature pipelines that consume raw events from Kafka, compute ML features, and write them to feature stores or serving layers.

Feature Pipeline Architecture

A real-time feature pipeline has three stages:

  • Ingest: Raw events flow into Kafka topics from application services, IoT devices, or user interactions.
  • Transform: Stream processors compute features — aggregations, joins, lookups, and enrichments.
  • Serve: Computed features are written to a low-latency store (Redis, DynamoDB, or a feature store) for model serving.

Building a Feature Pipeline

Python — End-to-End Feature Pipeline
from confluent_kafka import Consumer, Producer
import json
import redis
from datetime import datetime, timedelta
from collections import defaultdict

# Setup
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'feature-pipeline',
    'auto.offset.reset': 'latest'
})
consumer.subscribe(['user-events'])

producer = Producer({'bootstrap.servers': 'localhost:9092'})
redis_client = redis.Redis(host='localhost', port=6379)

# In-memory state for windowed features
user_events = defaultdict(list)

def compute_features(user_id, events):
    """Compute features from recent events."""
    now = datetime.utcnow()
    one_hour_ago = now - timedelta(hours=1)

    # Filter to last hour
    recent = [e for e in events if e['timestamp'] > one_hour_ago]

    return {
        'user_id': user_id,
        'event_count_1h': len(recent),
        'unique_actions_1h': len(set(e['action'] for e in recent)),
        'avg_value_1h': sum(e['value'] for e in recent) / max(len(recent), 1),
        'last_action': recent[-1]['action'] if recent else None,
        'computed_at': now.isoformat()
    }

# Process loop
while True:
    msg = consumer.poll(1.0)
    if msg is None or msg.error():
        continue

    event = json.loads(msg.value())
    user_id = event['user_id']
    event['timestamp'] = datetime.fromisoformat(event['timestamp'])
    user_events[user_id].append(event)

    # Compute and store features
    features = compute_features(user_id, user_events[user_id])

    # Write to Redis for online serving
    redis_client.setex(
        f"features:{user_id}",
        3600,  # 1 hour TTL
        json.dumps(features)
    )

    # Write to Kafka for downstream consumers
    producer.produce('ml-features', value=json.dumps(features).encode())

Kafka Connect for Sinks

JSON — Kafka Connect Sink to Redis
{
  "name": "redis-feature-sink",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "tasks.max": "3",
    "topics": "ml-features",
    "redis.hosts": "localhost:6379",
    "redis.database": 0,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Feature Pipeline with Feast

Python — Kafka to Feast Feature Store
from feast import FeatureStore
from confluent_kafka import Consumer
import pandas as pd
import json

store = FeatureStore(repo_path="feature_repo/")
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'feast-ingestion'
})
consumer.subscribe(['ml-features'])

batch = []
BATCH_SIZE = 100

while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        batch.append(json.loads(msg.value()))

    if len(batch) >= BATCH_SIZE:
        df = pd.DataFrame(batch)
        df['event_timestamp'] = pd.to_datetime(df['computed_at'])

        # Write to online store
        store.write_to_online_store(
            feature_view_name="user_streaming_features",
            df=df
        )
        batch = []
Dual-write to online and offline: Write computed features to both a low-latency online store (Redis/DynamoDB) for serving and a data lake (S3/Delta Lake) for training. This ensures training-serving consistency.
💡
Handling late data: Events may arrive out of order. Use watermarks in Spark Structured Streaming or event-time processing in Flink to handle late arrivals gracefully. Set a grace period appropriate for your use case.