Advanced

Kafka ML Best Practices

Production-proven patterns for schema management, delivery guarantees, monitoring, testing, and operating Kafka-based ML systems.

Schema Management

Use the Confluent Schema Registry with Avro or Protobuf to enforce schema contracts between producers and consumers.

Python — Avro with Schema Registry
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

schema_str = """
{
  "type": "record",
  "name": "MLFeature",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "event_count_1h", "type": "int"},
    {"name": "avg_value_1h", "type": "double"},
    {"name": "computed_at", "type": "string"}
  ]
}
"""

producer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=schema_str)

producer.produce(topic='ml-features', value={
    'user_id': 'u123',
    'event_count_1h': 42,
    'avg_value_1h': 15.7,
    'computed_at': '2026-03-15T10:30:00Z'
})

Exactly-Once Semantics

Python — Transactional Producer
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'ml-feature-pipeline-1',
    'enable.idempotence': True
})
producer.init_transactions()

try:
    producer.begin_transaction()

    # Produce multiple messages atomically
    producer.produce('ml-features', value=feature_json)
    producer.produce('ml-audit-log', value=audit_json)

    # Commit offsets and messages together
    producer.send_offsets_to_transaction(
        consumer.position(consumer.assignment()),
        consumer.consumer_group_metadata()
    )
    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise

Monitoring Checklist

  • Consumer lag: Monitor the gap between latest offset and consumer offset. Alert if lag grows.
  • Throughput: Track messages/second for producers and consumers. Match producer rate.
  • Prediction latency: P50, P95, P99 end-to-end latency from event to prediction.
  • Error rate: Track deserialization errors, model errors, and dead-letter queue size.
  • Feature freshness: Time between event generation and feature availability.
  • Model drift: Compare prediction distributions over time windows.

Testing Streaming Pipelines

Python — Testing with Embedded Kafka
import pytest
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="module")
def kafka_broker():
    with KafkaContainer() as kafka:
        yield kafka.get_bootstrap_server()

def test_feature_pipeline(kafka_broker):
    """Test that feature pipeline computes correctly."""
    producer = Producer({'bootstrap.servers': kafka_broker})

    # Send test events
    events = [
        {'user_id': 'u1', 'action': 'click', 'value': 10.0},
        {'user_id': 'u1', 'action': 'view', 'value': 5.0},
        {'user_id': 'u1', 'action': 'click', 'value': 15.0},
    ]
    for event in events:
        producer.produce('test-events', value=json.dumps(event).encode())
    producer.flush()

    # Run pipeline and verify output
    features = run_feature_pipeline(kafka_broker, 'test-events')
    assert features['event_count_1h'] == 3
    assert features['avg_value_1h'] == 10.0

Frequently Asked Questions

Use cooperative rebalancing (partition.assignment.strategy=cooperative-sticky) to minimize disruption. Implement a rebalance callback to gracefully stop processing in-flight predictions before partitions are revoked.

Use Kafka for asynchronous, high-throughput inference where the caller doesn't need an immediate response (fraud detection, recommendations enrichment). Use REST APIs for synchronous, request-response patterns where the caller blocks for the prediction.

Set partitions to your maximum expected consumer parallelism. For an inference pipeline that may scale to 20 consumers, use 20-30 partitions. Over-partitioning has minimal cost, but under-partitioning limits your scaling ceiling.