β‘ Streaming Pipeline
1. Configure MySQL connection
You need to complete the setup in the Prerequisites section before proceeding.
Set up your .env
file with the necessary credentials to connect to your MySQL database:
MSQL_HOST=127.0.0.1
MSQL_USER=root
MSQL_PASSWORD=yourpassword
MSQL_DATABASE=coffee_shop
2. Setup Kafka
To set up Kafka Connect, we first need to configure the Kafka Broker and Kafka UI as follows:
Kafka Broker
Two Kafka brokers are configured with data stored in volumes/kafka-1
and volumes/kafka-2
. You can scale up (e.g., 3 brokers) if more resources are available.
Since Prometheus
and Grafana
are used for monitoring, JMX is also configured for each Kafka broker to export metrics.
kafka-1:
container_name: kafka-1
image: 'bitnami/kafka:3.5.1'
ports:
- '29092:29092'
networks:
- myNetwork
environment:
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9092,EXTERNAL://localhost:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_KRAFT_CLUSTER_ID=k3Fk2HhXQwWL9rPlYxYOoA
- KAFKA_OPTS=-javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-broker.yml
volumes:
- ./volumes/kafka-1:/bitnami/kafka
- ./volumes/jmx-exporter:/usr/share/jmx-exporter
kafka-2:
container_name: kafka-2
image: 'bitnami/kafka:3.5.1'
ports:
- "29093:29093"
environment:
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-2:9092,EXTERNAL://localhost:29093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_KRAFT_CLUSTER_ID=k3Fk2HhXQwWL9rPlYxYOoA
- KAFKA_OPTS=-javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-broker.yml
volumes:
- ./volumes/kafka-2:/bitnami/kafka
- ./volumes/jmx-exporter:/usr/share/jmx-exporter
networks:
- myNetwork
Create Required Topics for Kafka Connect
Kafka Connect requires three topics: connect-configs
, connect-offsets
, and connect-status
.
These topics must be created before starting Kafka Connect, otherwise, the connector will fail to start.
Additionally, we also pre-create the topic mysql.coffee-shop.order_details
, which is the destination topic where Kafka Connect pushes CDC (Change Data Capture) events from the order_details
table in MySQL.
To ensure that the topics are created properly, we set up an init-kafka
container. This container will wait until Kafka is fully ready before creating the required topics.
init-kafka:
image: 'bitnami/kafka:3.5.1'
container_name: init-kafka
depends_on:
- kafka-1
- kafka-2
networks:
- myNetwork
entrypoint: ["/bin/bash", "-c"]
command:
- |
echo "Waiting for Kafka to be ready..."
while ! kafka-topics.sh --bootstrap-server kafka-1:9092 --list; do
sleep 5
done
echo "π Kafka is ready. Creating topics ...... π"
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 1 --replication-factor 1 --config cleanup.policy=compact --topic connect-configs
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 10 --replication-factor 2 --config cleanup.policy=compact --topic connect-offsets
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 5 --replication-factor 2 --config cleanup.policy=compact --topic connect-status
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 3 --replication-factor 2 --topic mysql.coffee_shop.order_details
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 2 --replication-factor 2 --topic mysql.coffee_shop.orders
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 2 --replication-factor 2 --topic order_ready_for_checking
echo 'π Topic created successfully! π'
kafka-topics.sh --bootstrap-server kafka-1:9092 --list
Info
We chose 2 partitions and 2 replication factor for most topics based on our system's current load and hardware limitations.
-
Throughput:
With an average rate of ~17 messages per second (i.e., 1000+ messages per minute), even 1 partition could technically handle the load. However, because messages are keyed byorder_id
and some orders contain multiple products, there's a risk of data skew. Using 3 partitions formysql.coffee_shop.order_details
helps distribute the load more evenly. -
Replication:
Best practice recommends:- 3 brokers
- 3 replication factor
min.insync.replicas
= 2
This setup ensures fault tolerance and data durability.
π However, due to local machine constraints, I use only 2 brokers and replication factor = 2, which still offers basic redundancy. It's not ideal to use a single broker in a streaming system because we need accurate and consistent data to execute real-time business logic. If that single broker goes down, the entire system would fail, potentially causing critical disruptions or financial loss.
Kafka Connect Configuration
Finally, we configure Kafka Connect. The commands
section downloads the required MongoDB and Elasticsearch connector plugins.
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: kafka-connect
depends_on:
- kafka-1
- kafka-2
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_GROUP_ID: "connect-cluster"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"
KAFKA_JMX_OPTS: -javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-connect.yml
command:
- bash
- -c
- |
if [ ! -d "/usr/share/confluent-hub-components/debezium-debezium-connector-mysql" ]; then
confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
fi
/etc/confluent/docker/run
volumes:
- ./volumes/jmx-exporter:/usr/share/jmx-exporter/
- ./volumes/connector-plugins:/plugins
restart: always
networks:
- myNetwork
3. Redis
Since real-time business logic is implemented (visit here for more details), the system needs to access data from the database for processing. To reduce load and improve performance, Redis is used to cache data that is frequently accessed but infrequently updated, minimizing database queries.
redis:
image: redis/redis-stack:latest
container_name: redis
ports:
- "6379:6379"
- "8001:8001"
volumes:
- ./volumes/redis:/data
depends_on:
- kafka-1
- kafka-2
networks:
- myNetwork
4. Prometheus and Grafana
Prometheus and Grafana are used to monitor Kafka, gather Kafka metrics, and trigger alerts when errors occur.
You must configure the following environment variables in the .env
file to enable email alerts in Grafana:
SMTP_USER=your_email@example.com
SMTP_PASSWORD=your_email_app_password
SMTP_FROM_ADDRESS=alert_sender@example.com
Note
This is an app password, not your Gmail account password. Visit this guide for more details.
prometheus:
image: prom/prometheus:v2.50.0
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./volumes/monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- myNetwork
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SMTP_ENABLED=true
- GF_SMTP_HOST=smtp.gmail.com:587
- GF_SMTP_USER=${SMTP_USER}
- GF_SMTP_PASSWORD=${SMTP_PASSWORD}
- GF_SMTP_FROM_ADDRESS=${SMTP_FROM_ADDRESS}
- GF_SMTP_FROM_NAME=Grafana Alerts
volumes:
- ./volumes/grafana/data:/var/lib/grafana
- ./volumes/monitoring/grafana/provisioning:/etc/grafana/provisioning
depends_on:
- prometheus
- kafka-1
- kafka-2
networks:
- myNetwork
5. Run the Streaming script
Execution Overview π¬
Before setting up the environment, hereβs a quick demo of the streaming pipeline in action:
π½οΈ
Run the Real-time Pipeline
1. Start Required Containers
First, bring up all necessary containers for real-time streaming:
docker-compose -f docker-compose-realtime.yaml up -d
2. Prepare Real-time Prerequisites
Before starting the data pipeline, make sure the following prerequisites are met:
2.1. Enable local_infile
on MySQL
This allows loading static data files using LOAD DATA LOCAL INFILE
:
SET GLOBAL local_infile = 1;
2.2. Enable MySQL Binary Logging (binlog)
Follow the instructions here to create the required user and enable the binlog.
mysql-src-connector.json
.
Note
If you use a different username or password, make sure to update the following configuration in the mysql-src-connector.json
file accordingly:
"database.user": "dbz-user",
"database.password": "dbz-user",
Instead of running multiple commands manually, we have a PowerShell script ./setup-realtime-prerequisites.ps1
that automates the prerequisite setup, including:
βοΈ Creating necessary tables in MySQL
βοΈ Loading static file into the tables
βοΈ Caching lookup data for later use
βοΈ Registering the MySQL source connector
-
Creating necessary tables in MySQL
The first step is to create all the tables required for this project.# Run the script to create necessary tables in the MySQL database python scripts/database/create_table.py Write-Host "============================================================"
-
Loading static file into the tables
Load CSV files located in thedata/
folder into corresponding MySQL tables (e.g., stores, products, ...).# Load static reference data (e.g., products, stores) into the database python scripts/database/load_static_file.py Write-Host "============================================================"
-
Caching lookup data
Cache frequently accessed reference data to Redis for faster response and reduced database queries.# Populate Redis cache with referential data for faster lookups python scripts/database/lookup_data_cache.py Write-Host "============================================================"
-
Registering MySQL Source Connector to Kafka Connect
Register the MySQL source connector using themysql-src-connector.json
file. This file contains the Kafka Source Connector configuration.Refer to MySQL Kafka Connector Configuration Properties for more details.# Register the MySQL source connector to Kafka Connect for capturing change data (CDC) Invoke-WebRequest -Uri "http://localhost:8083/connectors" -Method Post -ContentType "application/json" -InFile "./scripts/real-time/mysql-src-connector.json" Write-Host "============================================================"
Note
The number of Kafka consumers should be less than or equal to the number of partitions to ensure proper parallel processing.
You can increase or decrease the number of partitions and update the num_workers
value accordingly (e.g., num_workers=3
means 3 parallel consumer processes).
def main() -> None:
num_workers = 3
processes = []
for i in range(num_workers):
p = multiprocessing.Process(target=consumer_worker, args=(i,))
p.start()
processes.append(p)
for p in processes:
p.join()
3. Run the Pipeline
Once all containers are ready and healthy, and the prerequisites are configured, you can:
-
Access services:
- Kafka UI: localhost:8000
- Kafka Connect UI: localhost:8083
-
Launch consumer workers:
python scripts/real-time/order_consumer.py python scripts/real-time/order_details_consumer.py python scripts/real-time/check_and_recommendation.py
-
Then, start sending data into the system:
python scripts/database/generate_data.py
Note
Each script includes a comment header that explains what it does and why itβs used. Feel free to check them out.
You can view the demo in the Execution Overview.
Monitor & Alert
- Prometheus: localhost:9090
- Grafana: localhost:3000
If an alert condition is triggered (e.g., a Kafka broker goes down or a Kafka Connect task fails), an email notification will be sent as shown below: