Skip to content

⏳ Batch Pipeline

Run the necessary containers. We add the --build flag because we have customized our Airflow image using a Dockerfile located in the airflow/ folder.

docker compose -f docker-compose-batch.yaml --build -d

1. Setup Spark

You can change the SPARK_WORKER_CORES and SPARK_WORKER_MEMORY values in the spark-worker service depending on your available resources.

  x-spark-common: &spark-common
    image: bitnami/spark:3.5.1
    volumes:
      - ./scripts/batch:/opt/bitnami/spark/scripts
    networks:
      - myNetwork

  spark-master:
    <<: *spark-common
    command: bin/spark-class org.apache.spark.deploy.master.Master
    ports:
      - "9090:8080"
      - "7077:7077"
    networks:
      - myNetwork

  spark-worker:
    <<: *spark-common
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 4g
      SPARK_MASTER_URL: spark://spark-master:7077
    networks:
      - myNetwork

2. Setup Airflow

We will mount all the necessary components to run the Spark job inside the Airflow container.

x-airflow-common:
  &airflow-common
  build:
    context: .
    dockerfile: ./airflow/Dockerfile
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    MYSQL_HOST: "host.docker.internal"
    MYSQL_USER: ${MYSQL_USER}
    MYSQL_PASSWORD: ${MYSQL_PASSWORD}
    MYSQL_DATABASE: ${MYSQL_DATABASE}
  volumes:
    - ./airflow/dags:/opt/airflow/dags
    - ./airflow/logs:/opt/airflow/logs
    - ./scripts/batch:/opt/airflow/scripts
    - ./mysql-connector-j-8.0.33.jar:/opt/airflow/mysql-connector-j-8.0.33.jar
    - ./logs:/opt/airflow/logger  # contains all the logs generated by the scripts. 
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    - postgres
  networks:
    - myNetwork
After the Airflow webserver is up, you can access the UI by navigating to localhost:8080. Log in using the default credentials — both the username and password are airflow.
Image

Once logged in, you should see the DAG named spark_job_airflow.py listed in the dashboard. Image

Before running the DAG, make sure to create a Spark connection in Airflow.
Set the host field to spark://spark-master, which corresponds to the name of the Spark master service defined in the Docker Compose setup. The port should be set to 7077, which is the default port used by the Spark master for cluster. Image Image

All the tasks are defined in the DAG as shown below. Image

Info

Here are some additional points worth mentioning:

We have implemented custom data quality checks located in the batch/data_quality/ folder. These scripts validate the data at both the bronze and silver layers. For example:

  • bronze_validation.py: This script checks for schema conformity, non-null constraints, and basic data integrity. The reason for performing schema validation at this stage is to catch issues early—such as unexpected schema changes—before the data flows into downstream layers. This early detection saves time and effort required for backfilling in case of errors.

  • silver_validation.py: This script uses Deequ to enforce additional data quality constraints, such as non-null checks and domain-specific validations on certain columns.

3. Setup Minio

Go to localhost:9001 to create the necessary buckets. In this case, create the following buckets: bronze-layer, silver-layer, and gold-layer. Image

4. Run the Batch Pipeline

Now we can execute the DAG Image

You can check the execution logs in the logs/ folder. Image Image

Go to Minio UI to view the data. Image Image Image