Step 5: Kafka Streaming Pipeline
Build an event-driven streaming pipeline using Apache Kafka that ingests transaction events in real time, scores them with our fraud model, and routes alerts to downstream investigation queues with exactly-once delivery guarantees.
Streaming Architecture
The streaming pipeline has three Kafka topics forming a processing chain:
- transactions: Raw transaction events from payment gateways. Partitioned by card hash for ordering guarantees per card.
- fraud-scores: Enriched events with fraud score, risk level, and decision. Every transaction gets scored.
- fraud-alerts: Only transactions flagged as fraud. Consumed by the investigation team dashboard.
Docker Compose for Kafka
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_NUM_PARTITIONS: 6
kafka-init:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- kafka
entrypoint: ["/bin/sh", "-c"]
command: |
"
echo 'Waiting for Kafka...' &&
cub kafka-ready -b kafka:9092 1 30 &&
kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 \
--topic transactions --partitions 6 --replication-factor 1 &&
kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 \
--topic fraud-scores --partitions 6 --replication-factor 1 &&
kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 \
--topic fraud-alerts --partitions 3 --replication-factor 1 &&
echo 'Topics created!'
"
# Start: docker-compose up -d
Kafka Configuration
# src/streaming/config.py
from dataclasses import dataclass
@dataclass
class KafkaConfig:
"""Centralized Kafka configuration."""
bootstrap_servers: str = "localhost:9092"
transactions_topic: str = "transactions"
scores_topic: str = "fraud-scores"
alerts_topic: str = "fraud-alerts"
consumer_group: str = "fraud-scoring-group"
# Producer settings
producer_config: dict = None
# Consumer settings
consumer_config: dict = None
def __post_init__(self):
self.producer_config = {
'bootstrap.servers': self.bootstrap_servers,
'acks': 'all', # Wait for all replicas
'retries': 3,
'retry.backoff.ms': 100,
'linger.ms': 5, # Batch for 5ms for throughput
'batch.size': 65536, # 64KB batches
'compression.type': 'snappy',
'enable.idempotence': True, # Exactly-once producer
}
self.consumer_config = {
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.consumer_group,
'auto.offset.reset': 'latest',
'enable.auto.commit': False, # Manual commit for exactly-once
'max.poll.interval.ms': 30000,
'session.timeout.ms': 10000,
'fetch.min.bytes': 1024,
'fetch.wait.max.ms': 100,
}
config = KafkaConfig()
Transaction Producer
The producer simulates a payment gateway sending transaction events to Kafka. In production, this would be the actual payment processor:
# src/streaming/producer.py
import json
import time
import random
import hashlib
from datetime import datetime
from confluent_kafka import Producer
from .config import config
class TransactionProducer:
"""Produces transaction events to Kafka.
Simulates a payment gateway sending credit card transactions.
In production, this would be replaced by the actual payment
processor's Kafka integration.
"""
def __init__(self):
self.producer = Producer(config.producer_config)
self.tx_count = 0
self.error_count = 0
def delivery_callback(self, err, msg):
"""Called once per produced message to confirm delivery."""
if err:
self.error_count += 1
print(f"DELIVERY FAILED: {err}")
else:
self.tx_count += 1
def produce_transaction(self, transaction: dict):
"""Send a single transaction event to Kafka."""
# Use card_hash as partition key for ordering per card
key = transaction.get('card_hash', 'unknown')
self.producer.produce(
topic=config.transactions_topic,
key=key.encode('utf-8'),
value=json.dumps(transaction).encode('utf-8'),
callback=self.delivery_callback
)
self.producer.poll(0) # Trigger delivery callbacks
def generate_transaction(self, fraud_rate: float = 0.002) -> dict:
"""Generate a synthetic transaction for testing."""
is_fraud = random.random() < fraud_rate
# Simulate different fraud patterns
if is_fraud:
amount = random.choice([
random.uniform(500, 5000), # High amount fraud
random.uniform(0.01, 1.0), # Micro-transaction testing
round(random.uniform(100, 1000)), # Round number fraud
])
# Fraud PCA features tend to have larger magnitudes
v_features = {
f'V{i}': round(random.gauss(0, 2.5), 4)
for i in range(1, 29)
}
else:
amount = round(random.uniform(1, 500), 2)
v_features = {
f'V{i}': round(random.gauss(0, 1), 4)
for i in range(1, 29)
}
card_hash = hashlib.md5(
f"card_{random.randint(1, 10000)}".encode()
).hexdigest()[:16]
return {
'transaction_id': f"tx_{int(time.time()*1000)}_{random.randint(0,9999):04d}",
'card_hash': card_hash,
'amount': round(amount, 2),
'timestamp': datetime.utcnow().isoformat() + 'Z',
'merchant_category': random.choice([
'retail', 'food', 'travel', 'online', 'atm', 'gas'
]),
**v_features,
'_synthetic_label': int(is_fraud) # For testing only
}
def run_simulation(
self, duration_seconds: int = 60, tps: int = 100
):
"""Run a transaction simulation at specified TPS."""
print(f"Starting simulation: {tps} TPS for {duration_seconds}s")
start = time.time()
interval = 1.0 / tps
while time.time() - start < duration_seconds:
tx = self.generate_transaction()
self.produce_transaction(tx)
time.sleep(interval)
self.producer.flush(timeout=10)
elapsed = time.time() - start
print(f"\nSimulation complete:")
print(f" Duration: {elapsed:.1f}s")
print(f" Produced: {self.tx_count:,}")
print(f" Errors: {self.error_count}")
print(f" Effective TPS: {self.tx_count/elapsed:.1f}")
if __name__ == "__main__":
producer = TransactionProducer()
producer.run_simulation(duration_seconds=60, tps=100)
Fraud Scoring Consumer
# src/streaming/consumer.py
import json
import time
import signal
import sys
from datetime import datetime
from confluent_kafka import Consumer, Producer, KafkaError
from ..api.predictor import FraudPredictor
from .config import config
class FraudScoringConsumer:
"""Consumes transactions, scores them, and routes results.
Reads from 'transactions' topic, scores each with the fraud model,
writes enriched events to 'fraud-scores', and high-risk events
to 'fraud-alerts'.
"""
def __init__(self):
self.consumer = Consumer(config.consumer_config)
self.producer = Producer(config.producer_config)
self.predictor = FraudPredictor("models/fraud_model.pkl")
# Metrics
self.processed = 0
self.fraud_count = 0
self.total_latency_ms = 0
self.running = True
# Graceful shutdown
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def _shutdown(self, signum, frame):
print("\nShutting down gracefully...")
self.running = False
def process_transaction(self, raw_message: bytes) -> dict:
"""Score a single transaction and return enriched event."""
start = time.perf_counter()
tx = json.loads(raw_message)
# Extract features for the model
v_features = {
f'V{i}': tx.get(f'V{i}', 0.0) for i in range(1, 29)
}
timestamp = datetime.fromisoformat(
tx['timestamp'].replace('Z', '+00:00')
)
# Compute features and predict
feature_vector = self.predictor.compute_features(
amount=tx['amount'],
timestamp=timestamp,
v_features=v_features
)
result = self.predictor.predict(feature_vector)
# Build enriched event
enriched = {
**tx,
'fraud_score': result['fraud_score'],
'is_fraud': result['is_fraud'],
'risk_level': result['risk_level'],
'threshold': result['threshold_used'],
'scored_at': datetime.utcnow().isoformat() + 'Z',
'scoring_latency_ms': round(
(time.perf_counter() - start) * 1000, 2
)
}
return enriched
def route_result(self, enriched: dict):
"""Route scored transaction to appropriate topics."""
key = enriched.get('card_hash', 'unknown').encode('utf-8')
value = json.dumps(enriched).encode('utf-8')
# All scored transactions go to fraud-scores
self.producer.produce(
topic=config.scores_topic,
key=key,
value=value
)
# Fraud alerts go to a separate topic for investigators
if enriched['is_fraud']:
alert = {
'alert_id': f"alert_{enriched['transaction_id']}",
'transaction_id': enriched['transaction_id'],
'card_hash': enriched['card_hash'],
'amount': enriched['amount'],
'fraud_score': enriched['fraud_score'],
'risk_level': enriched['risk_level'],
'merchant_category': enriched.get('merchant_category'),
'timestamp': enriched['timestamp'],
'alert_time': datetime.utcnow().isoformat() + 'Z',
'action_required': 'BLOCK' if enriched['risk_level'] == 'CRITICAL' else 'REVIEW'
}
self.producer.produce(
topic=config.alerts_topic,
key=key,
value=json.dumps(alert).encode('utf-8')
)
self.fraud_count += 1
def run(self):
"""Main consumer loop with manual offset commits."""
self.consumer.subscribe([config.transactions_topic])
print(f"Listening on topic: {config.transactions_topic}")
print(f"Consumer group: {config.consumer_group}")
batch_start = time.time()
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Consumer error: {msg.error()}")
continue
# Process the transaction
try:
enriched = self.process_transaction(msg.value())
self.route_result(enriched)
self.processed += 1
self.total_latency_ms += enriched['scoring_latency_ms']
except Exception as e:
print(f"Processing error: {e}")
continue
# Commit offset after successful processing
self.consumer.commit(asynchronous=False)
# Flush producer periodically
if self.processed % 100 == 0:
self.producer.flush(timeout=5)
# Log stats every 10 seconds
if time.time() - batch_start >= 10:
avg_latency = (
self.total_latency_ms / max(self.processed, 1)
)
print(
f"Stats: processed={self.processed:,}, "
f"fraud={self.fraud_count}, "
f"avg_latency={avg_latency:.1f}ms"
)
batch_start = time.time()
finally:
self.producer.flush(timeout=10)
self.consumer.close()
print(f"\nFinal stats:")
print(f" Total processed: {self.processed:,}")
print(f" Total fraud: {self.fraud_count}")
if self.processed > 0:
print(f" Fraud rate: "
f"{self.fraud_count/self.processed*100:.3f}%")
print(f" Avg latency: "
f"{self.total_latency_ms/self.processed:.1f}ms")
if __name__ == "__main__":
consumer = FraudScoringConsumer()
consumer.run()
enable.idempotence=True) with manual consumer commits (enable.auto.commit=False). The consumer only commits the offset after the scored message has been successfully produced to the output topic. If the consumer crashes mid-processing, the message will be re-consumed and re-scored on restart.Running the Full Pipeline
# Terminal 1: Start Kafka
docker-compose up -d
# Terminal 2: Start the scoring consumer
python -m src.streaming.consumer
# Terminal 3: Start the transaction producer
python -m src.streaming.producer
# Terminal 4: Monitor fraud alerts
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic fraud-alerts \
--from-beginning \
--property print.key=true
Alert Consumer for Investigation Dashboard
# src/streaming/alert_consumer.py
import json
from confluent_kafka import Consumer
from .config import config
class AlertConsumer:
"""Consumes fraud alerts for the investigation dashboard.
In production, this would push alerts to:
- Investigation case management system
- Slack/PagerDuty for on-call fraud analysts
- Real-time dashboard (e.g., via WebSocket)
"""
def __init__(self):
alert_config = {
**config.consumer_config,
'group.id': 'fraud-alert-consumers'
}
self.consumer = Consumer(alert_config)
def run(self):
self.consumer.subscribe([config.alerts_topic])
print("Monitoring fraud alerts...")
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
alert = json.loads(msg.value())
action = alert['action_required']
symbol = "!!!" if action == "BLOCK" else "(!)"
print(f"\n{symbol} FRAUD ALERT {symbol}")
print(f" TX: {alert['transaction_id']}")
print(f" Card: {alert['card_hash']}")
print(f" Amount: ${alert['amount']:.2f}")
print(f" Score: {alert['fraud_score']:.4f}")
print(f" Risk: {alert['risk_level']}")
print(f" Action: {action}")
print(f" Category: {alert.get('merchant_category', 'unknown')}")
self.consumer.commit(asynchronous=False)
What Is Next
The streaming pipeline is operational: transactions flow in, get scored, and fraud alerts are routed to investigators. But how do we know the model is still accurate next week or next month? In the next lesson, we will build monitoring to detect data drift, track model performance over time, and trigger automated retraining when accuracy degrades.
Lilly Tech Systems