Kafka ML Best Practices
Production-proven patterns for schema management, delivery guarantees, monitoring, testing, and operating Kafka-based ML systems.
Schema Management
Use the Confluent Schema Registry with Avro or Protobuf to enforce schema contracts between producers and consumers.
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
schema_str = """
{
"type": "record",
"name": "MLFeature",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_count_1h", "type": "int"},
{"name": "avg_value_1h", "type": "double"},
{"name": "computed_at", "type": "string"}
]
}
"""
producer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=schema_str)
producer.produce(topic='ml-features', value={
'user_id': 'u123',
'event_count_1h': 42,
'avg_value_1h': 15.7,
'computed_at': '2026-03-15T10:30:00Z'
})
Exactly-Once Semantics
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'ml-feature-pipeline-1',
'enable.idempotence': True
})
producer.init_transactions()
try:
producer.begin_transaction()
# Produce multiple messages atomically
producer.produce('ml-features', value=feature_json)
producer.produce('ml-audit-log', value=audit_json)
# Commit offsets and messages together
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata()
)
producer.commit_transaction()
except Exception as e:
producer.abort_transaction()
raise
Monitoring Checklist
- Consumer lag: Monitor the gap between latest offset and consumer offset. Alert if lag grows.
- Throughput: Track messages/second for producers and consumers. Match producer rate.
- Prediction latency: P50, P95, P99 end-to-end latency from event to prediction.
- Error rate: Track deserialization errors, model errors, and dead-letter queue size.
- Feature freshness: Time between event generation and feature availability.
- Model drift: Compare prediction distributions over time windows.
Testing Streaming Pipelines
import pytest
from testcontainers.kafka import KafkaContainer
@pytest.fixture(scope="module")
def kafka_broker():
with KafkaContainer() as kafka:
yield kafka.get_bootstrap_server()
def test_feature_pipeline(kafka_broker):
"""Test that feature pipeline computes correctly."""
producer = Producer({'bootstrap.servers': kafka_broker})
# Send test events
events = [
{'user_id': 'u1', 'action': 'click', 'value': 10.0},
{'user_id': 'u1', 'action': 'view', 'value': 5.0},
{'user_id': 'u1', 'action': 'click', 'value': 15.0},
]
for event in events:
producer.produce('test-events', value=json.dumps(event).encode())
producer.flush()
# Run pipeline and verify output
features = run_feature_pipeline(kafka_broker, 'test-events')
assert features['event_count_1h'] == 3
assert features['avg_value_1h'] == 10.0
Frequently Asked Questions
Use cooperative rebalancing (partition.assignment.strategy=cooperative-sticky) to minimize disruption. Implement a rebalance callback to gracefully stop processing in-flight predictions before partitions are revoked.
Use Kafka for asynchronous, high-throughput inference where the caller doesn't need an immediate response (fraud detection, recommendations enrichment). Use REST APIs for synchronous, request-response patterns where the caller blocks for the prediction.
Set partitions to your maximum expected consumer parallelism. For an inference pipeline that may scale to 20 consumers, use 20-30 partitions. Over-partitioning has minimal cost, but under-partitioning limits your scaling ceiling.
Lilly Tech Systems