Skip to content

⚑ Streaming Pipeline

1. Configure MySQL connection

You need to complete the setup in the Prerequisites section before proceeding.

Set up your .env file with the necessary credentials to connect to your MySQL database:

MSQL_HOST=127.0.0.1
MSQL_USER=root
MSQL_PASSWORD=yourpassword
MSQL_DATABASE=coffee_shop

2. Setup Kafka

To set up Kafka Connect, we first need to configure the Kafka Broker and Kafka UI as follows:

Kafka Broker

Two Kafka brokers are configured with data stored in volumes/kafka-1 and volumes/kafka-2. You can scale up (e.g., 3 brokers) if more resources are available.

Since Prometheus and Grafana are used for monitoring, JMX is also configured for each Kafka broker to export metrics.

  kafka-1:
    container_name: kafka-1
    image: 'bitnami/kafka:3.5.1'
    ports:
      - '29092:29092'
    networks:
      - myNetwork
    environment:
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9092,EXTERNAL://localhost:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_KRAFT_CLUSTER_ID=k3Fk2HhXQwWL9rPlYxYOoA
      - KAFKA_OPTS=-javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-broker.yml
    volumes:
      - ./volumes/kafka-1:/bitnami/kafka
      - ./volumes/jmx-exporter:/usr/share/jmx-exporter

  kafka-2:
    container_name: kafka-2
    image: 'bitnami/kafka:3.5.1'
    ports:
      - "29093:29093"
    environment:
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-2:9092,EXTERNAL://localhost:29093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_KRAFT_CLUSTER_ID=k3Fk2HhXQwWL9rPlYxYOoA
      - KAFKA_OPTS=-javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-broker.yml
    volumes:
      - ./volumes/kafka-2:/bitnami/kafka
      - ./volumes/jmx-exporter:/usr/share/jmx-exporter
    networks:
      - myNetwork

Create Required Topics for Kafka Connect

Kafka Connect requires three topics: connect-configs, connect-offsets, and connect-status. These topics must be created before starting Kafka Connect, otherwise, the connector will fail to start.

Additionally, we also pre-create the topic mysql.coffee-shop.order_details, which is the destination topic where Kafka Connect pushes CDC (Change Data Capture) events from the order_details table in MySQL.

To ensure that the topics are created properly, we set up an init-kafka container. This container will wait until Kafka is fully ready before creating the required topics.

  init-kafka:
    image: 'bitnami/kafka:3.5.1'
    container_name: init-kafka
    depends_on:
      - kafka-1
      - kafka-2
    networks:
      - myNetwork
    entrypoint: ["/bin/bash", "-c"]
    command: 
      - |
        echo "Waiting for Kafka to be ready..."
        while ! kafka-topics.sh --bootstrap-server kafka-1:9092 --list; do
          sleep 5
        done

        echo "πŸš€ Kafka is ready. Creating topics ...... πŸš€"
        kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 1 --replication-factor 1 --config cleanup.policy=compact  --topic connect-configs
        kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 10 --replication-factor 2 --config cleanup.policy=compact  --topic connect-offsets
        kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 5 --replication-factor 2 --config cleanup.policy=compact  --topic connect-status
        kafka-topics.sh --create --if-not-exists --bootstrap-server kafka-1:9092 --partitions 5 --replication-factor 2 --topic mysql.coffee_shop.order_details

        echo 'πŸš€ Topic created successfully! πŸš€'
        kafka-topics.sh --bootstrap-server kafka-1:9092 --list

Kafka Connect Configuration

Finally, we configure Kafka Connect. The commands section downloads the required MongoDB and Elasticsearch connector plugins.

  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: kafka-connect
    depends_on:
      - kafka-1
      - kafka-2
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"
      KAFKA_JMX_OPTS: -javaagent:/usr/share/jmx-exporter/jmx_prometheus_javaagent-0.20.0.jar=9300:/usr/share/jmx-exporter/kafka-connect.yml
    command:
      - bash
      - -c
      - |
          if [ ! -d "/usr/share/confluent-hub-components/debezium-debezium-connector-mysql" ]; then
            confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
          fi
          /etc/confluent/docker/run
    volumes:
      - ./volumes/jmx-exporter:/usr/share/jmx-exporter/
      - ./volumes/connector-plugins:/plugins
    restart: always
    networks:
      - myNetwork

3. Redis

Since real-time business logic is implemented (visit here for more details), the system needs to access data from the database for processing. To reduce load and improve performance, Redis is used to cache data that is frequently accessed but infrequently updated, minimizing database queries.

  redis:
    image: redis/redis-stack:latest
    container_name: redis
    ports:
      - "6379:6379"
      - "8001:8001"
    volumes:
      - ./volumes/redis:/data
    depends_on:
      - kafka-1
      - kafka-2 
    networks:
      - myNetwork

4. Prometheus and Grafana

Prometheus and Grafana are used to monitor Kafka, gather Kafka metrics, and trigger alerts when errors occur.

You must configure the following environment variables in the .env file to enable email alerts in Grafana:

SMTP_USER=your_email@example.com
SMTP_PASSWORD=your_email_app_password
SMTP_FROM_ADDRESS=alert_sender@example.com

Note

This is an app password, not your Gmail account password. Visit this guide for more details.

  prometheus:
    image: prom/prometheus:v2.50.0
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./volumes/monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - myNetwork

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SMTP_ENABLED=true
      - GF_SMTP_HOST=smtp.gmail.com:587
      - GF_SMTP_USER=${SMTP_USER}
      - GF_SMTP_PASSWORD=${SMTP_PASSWORD}
      - GF_SMTP_FROM_ADDRESS=${SMTP_FROM_ADDRESS}
      - GF_SMTP_FROM_NAME=Grafana Alerts
    volumes:
      - ./volumes/grafana/data:/var/lib/grafana
      - ./volumes/monitoring/grafana/provisioning:/etc/grafana/provisioning
    depends_on:
      - prometheus
      - kafka-1
      - kafka-2
    networks:
      - myNetwork

5. Run the Streaming script

Execution Overview 🎬

Before setting up the environment, here’s a quick demo of the streaming pipeline in action:
πŸ“½οΈ

Run the Real-time Pipeline Powershell Script

We can start the necessary containers and begin to execute our streaming pipeline.

docker-compose up -d 

Before preceeding further, you must enable some necessary features:

  • Enable local_infile on the server side.
    This is required because we use a script to load static data files into MySQL tables using the LOAD DATA LOCAL INFILE command. Without enabling this option, the script will fail to execute the data loading step.

    SET GLOBAL local_infile = 1;
    
  • Enable MySQL binary logging (binlog).
    Follow the instructions here to create the required user and enable the binlog. mysql-src-connector.json.

Note

If you use a different username or password, make sure to update the following configuration in the mysql-src-connector.json file accordingly:

"database.user": "dbz-user",
"database.password": "dbz-user",

Instead of running multiple commands manually, we have a PowerShell script real-time.ps1 that automates the entire process, including:

βœ”οΈ Creating necessary tables in MySQL

βœ”οΈ Loading static file into the tables

βœ”οΈ Caching lookup data for later use

βœ”οΈ Registering the MySQL source connector

βœ”οΈ Runing the Kafka client to handle new events

  1. Creating necessary tables in MySQL
    The first step is to create all the tables required for this project.

    # Run the script to create necessary tables in the MySQL database
    python scripts/database/create_table.py
    Write-Host "============================================================"
    
  2. Loading static file into the tables
    Load CSV files located in the data/ folder into corresponding MySQL tables (e.g., stores, products, ...).

    # Load static reference data (e.g., products, stores) into the database
    python scripts/database/load_static_file.py
    Write-Host "============================================================"
    
  3. Caching lookup data
    Cache frequently accessed reference data to Redis for faster response and reduced database queries.

    # Populate Redis cache with referential data for faster lookups
    python scripts/database/lookup_data_cache.py
    Write-Host "============================================================"
    
  4. Registering MySQL Source Connector to Kafka Connect
    Register the MySQL source connector using the mysql-src-connector.json file. This file contains the Kafka Source Connector configuration.

    # Register the MySQL source connector to Kafka Connect for capturing change data (CDC)
    Invoke-WebRequest -Uri "http://localhost:8083/connectors" -Method Post -ContentType "application/json" -InFile "./scripts/real-time/mysql-src-connector.json"
    Write-Host "============================================================"
    
    Refer to MySQL Kafka Connector Configuration Properties for more details.
  5. Executing the Kafka Client Start the Kafka client to consume new events and execute real-time business logic (e.g., product suggestions).

    # Start the Kafka client to listen for new order events and handle product suggestions logic
    python scripts/real-time/kafka_client.py
    

Note

The number of Kafka consumers should be less than or equal to the number of partitions to ensure proper parallel processing.
You can increase or decrease the number of partitions and update the num_workers value accordingly (e.g., num_workers=3 means 3 parallel consumer processes).

def main() -> None:
num_workers = 3
processes = []

for i in range(num_workers):
    p = multiprocessing.Process(target=consumer_worker, args=(i,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

Full script in the real-time.ps1 file.

Once all containers are ready and healthy, you can access the following services:

Execute the following command to run the entire pipeline setup:

.\real-time.ps1

Then, run the script below to update the order status:

python scripts/database/update_order_status.py

Next, start streaming data into the system:

python scripts/database/generate_data.py

You can view the demo in the Execution Overview.


If an alert condition is triggered (e.g., a Kafka broker goes down or a Kafka Connect task fails), an email notification will be sent as shown below:

Image