β‘ Streaming Pipeline
1. Configure MySQL connection
You need to complete the setup in the Prerequisites section before proceeding.
Set up your .env
file with the necessary credentials to connect to your MySQL database:
MSQL_HOST=127.0.0.1
MSQL_USER=root
MSQL_PASSWORD=yourpassword
MSQL_DATABASE=coffee_shop
2. Setup Kafka
To set up Kafka Connect, we first need to configure the Kafka Broker and Kafka UI as follows:
Kafka Broker
Two Kafka brokers are configured with data stored in volumes/kafka-1
and volumes/kafka-2
. You can scale up (e.g., 3 brokers) if more resources are available.
Since Prometheus
and Grafana
are used for monitoring, JMX is also configured for each Kafka broker to export metrics.
kafka-1:
container_name: kafka-1
image: 'bitnami/kafka:3.5.1'
ports:
- '29092:29092'
networks:
- myNetwork
environment:
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9092,EXTERNAL://localhost:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_KRAFT_CLUSTER_ID=k3Fk2HhXQwWL9rPlYxYOoA
- KAFKA_OPTS=-javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-broker.yml
volumes:
- ./volumes/kafka-1:/bitnami/kafka
- ./volumes/jmx-exporter:/usr/share/jmx-exporter
kafka-2:
container_name: kafka-2
image: 'bitnami/kafka:3.5.1'
ports:
- "29093:29093"
environment:
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-2:9092,EXTERNAL://localhost:29093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_KRAFT_CLUSTER_ID=k3Fk2HhXQwWL9rPlYxYOoA
- KAFKA_OPTS=-javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-broker.yml
volumes:
- ./volumes/kafka-2:/bitnami/kafka
- ./volumes/jmx-exporter:/usr/share/jmx-exporter
networks:
- myNetwork
Create Required Topics for Kafka Connect
Kafka Connect requires three topics: connect-configs
, connect-offsets
, and connect-status
.
These topics must be created before starting Kafka Connect, otherwise, the connector will fail to start.
Additionally, we also pre-create the topic mysql.coffee-shop.order_details
, which is the destination topic where Kafka Connect pushes CDC (Change Data Capture) events from the order_details
table in MySQL.
To ensure that the topics are created properly, we set up an init-kafka
container. This container will wait until Kafka is fully ready before creating the required topics.
init-kafka:
image: 'bitnami/kafka:3.5.1'
container_name: init-kafka
depends_on:
- kafka-1
- kafka-2
networks:
- myNetwork
entrypoint: ["/bin/bash", "-c"]
command:
- |
echo "Waiting for Kafka to be ready..."
while ! kafka-topics.sh --bootstrap-server kafka-1:9092 --list; do
sleep 5
done
echo "π Kafka is ready. Creating topics ...... π"
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 1 --replication-factor 1 --config cleanup.policy=compact --topic connect-configs
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 10 --replication-factor 2 --config cleanup.policy=compact --topic connect-offsets
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 5 --replication-factor 2 --config cleanup.policy=compact --topic connect-status
kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 5 --replication-factor 2 --topic mysql.coffee_shop.order_details
echo 'π Topic created successfully! π'
kafka-topics.sh --bootstrap-server kafka-1:9092 --list
Kafka Connect Configuration
Finally, we configure Kafka Connect. The commands
section downloads the required MongoDB and Elasticsearch connector plugins.
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: kafka-connect
depends_on:
- kafka-1
- kafka-2
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_GROUP_ID: "connect-cluster"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"
KAFKA_JMX_OPTS: -javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-connect.yml
command:
- bash
- -c
- |
if [ ! -d "/usr/share/confluent-hub-components/debezium-debezium-connector-mysql" ]; then
confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
fi
/etc/confluent/docker/run
volumes:
- ./volumes/jmx-exporter:/usr/share/jmx-exporter/
- ./volumes/connector-plugins:/plugins
restart: always
networks:
- myNetwork
3. Redis
Since real-time business logic is implemented (visit here for more details), the system needs to access data from the database for processing. To reduce load and improve performance, Redis is used to cache data that is frequently accessed but infrequently updated, minimizing database queries.
redis:
image: redis/redis-stack:latest
container_name: redis
ports:
- "6379:6379"
- "8001:8001"
volumes:
- ./volumes/redis:/data
depends_on:
- kafka-1
- kafka-2
networks:
- myNetwork
4. Prometheus and Grafana
Prometheus and Grafana are used to monitor Kafka, gather Kafka metrics, and trigger alerts when errors occur.
You must configure the following environment variables in the .env
file to enable email alerts in Grafana:
SMTP_USER=your_email@example.com
SMTP_PASSWORD=your_email_app_password
SMTP_FROM_ADDRESS=alert_sender@example.com
Note
This is an app password, not your Gmail account password. Visit this guide for more details.
prometheus:
image: prom/prometheus:v2.50.0
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./volumes/monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- myNetwork
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SMTP_ENABLED=true
- GF_SMTP_HOST=smtp.gmail.com:587
- GF_SMTP_USER=${SMTP_USER}
- GF_SMTP_PASSWORD=${SMTP_PASSWORD}
- GF_SMTP_FROM_ADDRESS=${SMTP_FROM_ADDRESS}
- GF_SMTP_FROM_NAME=Grafana Alerts
volumes:
- ./volumes/grafana/data:/var/lib/grafana
- ./volumes/monitoring/grafana/provisioning:/etc/grafana/provisioning
depends_on:
- prometheus
- kafka-1
- kafka-2
networks:
- myNetwork
5. Run the Streaming script
Execution Overview π¬
Before setting up the environment, hereβs a quick demo of the streaming pipeline in action:
π½οΈ
Run the Real-time Pipeline Powershell Script
We can start the necessary containers and begin to execute our streaming pipeline.
docker-compose up -d
Before preceeding further, you must enable some necessary features:
-
Enable
local_infile
on the server side.
This is required because we use a script to load static data files into MySQL tables using theLOAD DATA LOCAL INFILE
command. Without enabling this option, the script will fail to execute the data loading step.SET GLOBAL local_infile = 1;
-
Enable MySQL binary logging (binlog).
Follow the instructions here to create the required user and enable the binlog.mysql-src-connector.json
.
Note
If you use a different username or password, make sure to update the following configuration in the mysql-src-connector.json
file accordingly:
"database.user": "dbz-user",
"database.password": "dbz-user",
Instead of running multiple commands manually, we have a PowerShell script real-time.ps1
that automates the entire process, including:
βοΈ Creating necessary tables in MySQL
βοΈ Loading static file into the tables
βοΈ Caching lookup data for later use
βοΈ Registering the MySQL source connector
βοΈ Runing the Kafka client to handle new events
-
Creating necessary tables in MySQL
The first step is to create all the tables required for this project.# Run the script to create necessary tables in the MySQL database python scripts/database/create_table.py Write-Host "============================================================"
-
Loading static file into the tables
Load CSV files located in thedata/
folder into corresponding MySQL tables (e.g., stores, products, ...).# Load static reference data (e.g., products, stores) into the database python scripts/database/load_static_file.py Write-Host "============================================================"
-
Caching lookup data
Cache frequently accessed reference data to Redis for faster response and reduced database queries.# Populate Redis cache with referential data for faster lookups python scripts/database/lookup_data_cache.py Write-Host "============================================================"
-
Registering MySQL Source Connector to Kafka Connect
Register the MySQL source connector using themysql-src-connector.json
file. This file contains the Kafka Source Connector configuration.Refer to MySQL Kafka Connector Configuration Properties for more details.# Register the MySQL source connector to Kafka Connect for capturing change data (CDC) Invoke-WebRequest -Uri "http://localhost:8083/connectors" -Method Post -ContentType "application/json" -InFile "./scripts/real-time/mysql-src-connector.json" Write-Host "============================================================"
-
Executing the Kafka Client Start the Kafka client to consume new events and execute real-time business logic (e.g., product suggestions).
# Start the Kafka client to listen for new order events and handle product suggestions logic python scripts/real-time/kafka_client.py
Note
The number of Kafka consumers should be less than or equal to the number of partitions to ensure proper parallel processing.
You can increase or decrease the number of partitions and update the num_workers
value accordingly (e.g., num_workers=3
means 3 parallel consumer processes).
def main() -> None:
num_workers = 3
processes = []
for i in range(num_workers):
p = multiprocessing.Process(target=consumer_worker, args=(i,))
p.start()
processes.append(p)
for p in processes:
p.join()
Full script in the real-time.ps1
file.
Once all containers are ready and healthy, you can access the following services:
-
Kafka UI: localhost:8000
-
Kafka Connect UI: localhost:8083
-
Prometheus: localhost:9090
-
Grafana: localhost:3000
Execute the following command to run the entire pipeline setup:
.\real-time.ps1
Then, run the script below to update the order status:
python scripts/database/update_order_status.py
Next, start streaming data into the system:
python scripts/database/generate_data.py
You can view the demo in the Execution Overview.
If an alert condition is triggered (e.g., a Kafka broker goes down or a Kafka Connect task fails), an email notification will be sent as shown below: