Getting started - Apache Kafka¶
2025 Python
Kafka is like a central hub for real-time data with the following main properties:
Distributed, Fault-tolerant, High-throughput and Streaming platform.
Different applications can publish data to Kafka, and other applications can subscribe data from Kafka.
Kafka acts as a buffer and ensures reliable delivery of data,
even if the sending and receiving applications are not running at the same time or have different processing speeds.
It decouples the producers and consumers of data, making systems more flexible and scalable.
First implementation¶
Start a container with the kafka service
docker run --rm --name kafka-server --hostname kafka-server \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-p 9092:9092 -p 9093:9093 -p 9094:9094 bitnami/kafka:latest
Create a topic
In the /opt/bitnami/kafka/bin
directory, there is a set of scripts to interact with Kafka.
Connect to the container and use the script kafka-topics.sh
to create a topic.
The topic named duck-topic will be used by the producer and the consumer.”
# Connect to the container
docker exec -it kafka-server /bin/bash
# Creation of topic "duck-topic"
kafka-topics.sh --bootstrap-server localhost:9094 --topic duck-topic --create
# Check if the topic was successfully created
kafka-topics.sh --bootstrap-server localhost:9092 --list
First script productor/consumer in Python
We use uv to develop both scripts in python.
pyproject.toml
[project]
name = "Kafka"
requires-python = ">=3.12"
dependencies = [
"kafka-python-ng>=2.2.3",
]
productor.py — uv run .\productor.py
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time
try:
producer = KafkaProducer(
bootstrap_servers=["localhost:9094"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
for i in range(10): # Produce 10 messages
message = {"message_id": i, "data": f"Message {i}"}
producer.send("duck-topic", message)
print(f"Produced message: {message}")
time.sleep(1) # Send a message every second
producer.flush() # Ensure all messages are sent
print("Finished producing messages.")
except KafkaError as e:
print(f"Error producing messages: {e}")
finally:
if producer is not None:
producer.close()
consumer.py — uv run .\consumers.py
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
"duck-topic",
bootstrap_servers=["localhost:9094"],
group_id="my_consumer_group",
value_deserializer=lambda x: (loads(x.decode("utf-8")) if x else None),
key_deserializer=lambda x: x.decode("utf-8") if x else None,
auto_offset_reset="earliest", # or 'latest' or 'none'
enable_auto_commit=False, # Important for manual commits
)
try:
for message in consumer:
print(f"Received message: Key={message.key}, Value={message.value}")
consumer.commit()
except KeyboardInterrupt:
pass # Allow Ctrl+C to exit gracefully
finally:
consumer.close()