Getting started - Apache Kafka

2025 Python

Kafka is like a central hub for real-time data with the following main properties: Distributed, Fault-tolerant, High-throughput and Streaming platform. Different applications can publish data to Kafka, and other applications can subscribe data from Kafka.
Kafka acts as a buffer and ensures reliable delivery of data, even if the sending and receiving applications are not running at the same time or have different processing speeds. It decouples the producers and consumers of data, making systems more flexible and scalable.

First implementation

  • Start a container with the kafka service

docker run --rm --name kafka-server --hostname kafka-server \
    -e KAFKA_CFG_NODE_ID=0 \
    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 \
    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT \
    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \
    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
    -p 9092:9092 -p 9093:9093 -p 9094:9094 bitnami/kafka:latest
  • Create a topic

In the /opt/bitnami/kafka/bin directory, there is a set of scripts to interact with Kafka. Connect to the container and use the script kafka-topics.sh to create a topic. The topic named duck-topic will be used by the producer and the consumer.”

# Connect to the container
docker exec -it kafka-server /bin/bash
# Creation of topic "duck-topic"
kafka-topics.sh --bootstrap-server localhost:9094 --topic duck-topic --create
# Check if the topic was successfully created
kafka-topics.sh --bootstrap-server localhost:9092 --list
  • First script productor/consumer in Python

We use uv to develop both scripts in python.

pyproject.toml

[project]
name = "Kafka"
requires-python = ">=3.12"
dependencies = [
    "kafka-python-ng>=2.2.3",
]

productor.py — uv run .\productor.py

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time

try:
    producer = KafkaProducer(
        bootstrap_servers=["localhost:9094"],
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )

    for i in range(10):  # Produce 10 messages
        message = {"message_id": i, "data": f"Message {i}"}
        producer.send("duck-topic", message)
        print(f"Produced message: {message}")
        time.sleep(1)  # Send a message every second

    producer.flush()  # Ensure all messages are sent
    print("Finished producing messages.")
except KafkaError as e:
    print(f"Error producing messages: {e}")
finally:
    if producer is not None:
        producer.close()

consumer.py — uv run .\consumers.py

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    "duck-topic",
    bootstrap_servers=["localhost:9094"],
    group_id="my_consumer_group",
    value_deserializer=lambda x: (loads(x.decode("utf-8")) if x else None),
    key_deserializer=lambda x: x.decode("utf-8") if x else None,
    auto_offset_reset="earliest",  # or 'latest' or 'none'
    enable_auto_commit=False,  # Important for manual commits
)

try:
    for message in consumer:
        print(f"Received message: Key={message.key}, Value={message.value}")
        consumer.commit()

except KeyboardInterrupt:
    pass  # Allow Ctrl+C to exit gracefully

finally:
    consumer.close()