DOCUMENTATION
Overview
This pipeline is designed to automate the extraction, processing, and storage of job postings from the TOPCV website. It ensures data is cleaned, transformed, and structured for efficient querying and analysis. The processed data can be further utilized for visualization and reporting.
For a step-by-step deployment guide, visit the Deployment. Happy coding!
System Architecture
-
Data Ingestion: The pipeline collects job postings from the TOPCV website, extracting relevant information for further processing.
-
Data Processing: Extracted data is cleaned, transformed, and formatted to ensure consistency before being stored in a PostgreSQL database.
-
Workflow Orchestration: Airflow manages and schedules the entire workflow, automating data extraction, processing, and storage while maintaining task dependencies and execution order.
Technical Notes
1. Error Handling
To ensure data integrity and prevent unnecessary execution of downstream tasks, the pipeline includes error-handling mechanisms.
SQL File Validation: Before executing database operations, the check_sql_file
task verifies whether the postgres_query.sql
file contains any INSERT
statements.
If the file is empty, the pipeline raises an AirflowSkipException
, preventing unnecessary execution.
def check_sql_file(**kwargs):
postgres_sql_file = os.path.join(os.path.dirname(__file__), '..', 'tmp', 'postgres_query.sql')
if os.path.exists(postgres_sql_file) and os.path.getsize(postgres_sql_file) > 0:
logging.info("SQL file is not empty. Proceeding with execution.")
else:
logging.info("No SQL queries to execute.")
raise AirflowSkipException("Skipping task because SQL file is empty")
2. Web Scraping with Playwright
Initially, BeautifulSoup was considered for web scraping; however, due to website restrictions, Playwright was used instead.
- Headless Mode Issues: Running Playwright in
headless=True
mode caused failures, soheadless=False
was used. - Virtual Display Setup: To avoid UI rendering issues in a containerized environment,
pyvirtualdisplay
is installed.
3. Data Processing & Storage
The pipeline follows a structured data processing workflow:
- Staging Table: Data is first written to a staging table before transformation.
- Cleaning & Transformation: Data is cleaned and formatted using
DictCursor
to access table columns as key-value pairs. UsingDictCursor
enables direct access to values, for example by callingjob['title']
, which enhances code readability and maintainability. - Incremental Processing: Only new job postings (
posted_date > last_processed_time
) are processed to avoid duplication.
def clean_data(**kwargs):
conn = get_connection()
cur = conn.cursor(cursor_factory=DictCursor)
query = "SELECT * FROM staging_table"
last_processed_time = read_last_processed_time()
if last_processed_time:
query += " WHERE posted_date > %s"
cur.execute(query, (last_processed_time,))
else:
cur.execute(query)
scraped_jobs = cur.fetchall()
cleaned_jobs = []
for job in scraped_jobs:
cleaned_jobs.append({
'title': clean_title(job['job_name']),
'salary': clean_salary(job['salary']),
'company': job['job_company'],
'update': pendulum.instance(job['posted_date']).in_timezone('Asia/Ho_Chi_Minh'),
})
kwargs['ti'].xcom_push(key='cleaned_data', value=cleaned_jobs)
4. Timezone Management with Pendulum
Airflow runs all timestamps in UTC by default, which can cause inconsistencies.
To avoid this, the pendulum library ensures all timestamps are converted to Asia/Ho_Chi_Minh
(my local timezone) before pushing data to XCom.
transformed_jobs.append({
'update': pendulum.instance(job['posted_date']).in_timezone('Asia/Ho_Chi_Minh'),
'due_date': pendulum.instance(job['due_date']).in_timezone('Asia/Ho_Chi_Minh')
})
5. Data Transfer Using XCom
To maintain a structured data flow, Airflow’s XCom is used for inter-task communication.
- The
clean_data
task pushes processed job postings to XCom. - The
transform_data
task pulls this data for further processing. - Finally,
write_sql_query
generates SQL commands based on transformed data.
Future Enhancements & Limitations
Limitations
XCom Data Size Limit
Airflow's XCom is designed for small data exchanges between tasks.
- Storing large datasets (e.g., full job listings) might have a memory overflow error.
- Airflow XComs have a limited size and that depeonds on the database we use:
- The default size limit for an XCom value stored in the Airflow metadata database is 48KB.
- SQLite: 2GB
- Postgres: 1GB
- MySQL: 64KB
Ref: https://marclamberti.com/blog/airflow-xcom/#XCom_limitations
Future Enhancements
1. Automated Reporting & Visualization
After storing job data, the next step could be to generate reports or integrate with BI tools.
🔹 Possible options:
- Exporting reports to Google Sheets, Excel, or PDFs.
- Connecting PostgreSQL with Power BI, Metabase, or Superset for visualization.
2. Stream Processing for Real-Time Job Scraping
Currently, the pipeline operates in batch mode, meaning job postings are scraped and processed at scheduled intervals. This may cause delays in updating the latest job listings.
🔹 Enhancement: Introduce stream processing to handle job postings in real time.
Approach
- Continuously monitor job listings for updates.
- Trigger data ingestion as soon as a new job posting appears.
- Store real-time data in a temporary location (e.g., Kafka, Redis, or a staging table).
Hybrid Processing
- Streaming Mode: Ingest and process new postings in real-time.
- Batch Mode: Run scheduled jobs for historical data analysis and reporting.
Conclusion
Although this pipeline still has limitations and room for improvement, it has been a great learning experience. It pushed me to think about error handling, data flow between tasks, and different ways to utilize Airflow more effectively. Most importantly, this pipeline helped me develop problem-solving skills—thinking critically and exploring alternative approaches when tackling challenges.
There is no best—only better, and the same applies to this pipeline. Every challenge has been a step toward improvement, and each iteration brings new insights, reinforcing the mindset that continuous learning and refinement are key to building robust data workflows.s