Stop creating bad DAGs!
Use this tool to measure and compare the parse time of your DAGs, identify bottlenecks, and optimize your Airflow environment for better performance.
Retrieving parse metrics from an Airflow cluster is straightforward, but measuring the effectiveness of code optimizations can be tedious. Each code change requires redeploying the Python file to your cloud provider, waiting for the DAG to be parsed, and then extracting a new report — a slow and time-consuming process.
This tool simplifies the process of measuring and comparing DAG parse times. It uses the same parse method as Airflow (from the Airflow repository) to measure the time taken to parse your DAGs locally, storing results for future comparisons.
To know more about how the tool works, check out the Medium article.
It's recommended to use a virtualenv to avoid library conflicts. Once set up, you can install the package by running the following command:
pip install airflow-parse-bench
The command above installs only the essential library dependencies (Airflow and Airflow providers). You’ll need to manually install any additional libraries that your DAGs depend on.
For example, if a DAG uses boto3
to interact with AWS, ensure that boto3 is installed in your environment. Otherwise, you'll encounter parse errors.
Before parsing your DAGs, it's also necessary to start an Airflow database on your local machine. To do this, run the command below:
airflow db init
If your DAGs use Airflow Variables, you must define them locally as well. Use placeholder values, as the actual values aren't required for parsing purposes.
To setup Airflow Variables locally, you can use the following command:
airflow variables set MY_VARIABLE 'ANY TEST VALUE'
Without this, you'll encounter an error like:
error: 'Variable MY_VARIABLE does not exist'
To measure the parse time of a single Python file, just run:
airflow-parse-bench --path your_path/dag_test.py
The output will look like this:
The result table includes the following columns:
-
Filename: The name of the Python module containing the DAG. This unique name is the key to store DAG information.
-
Current Parse Time: The time (in seconds) taken to parse the DAG.
-
Previous Parse Time: The parse time from the previous run.
-
Difference: The difference between the current and previous parse times.
-
Best Parse Time: The best parse time recorded for the DAG.
You can also measure the parse time for all Python files in a directory by running:
airflow-parse-bench --path your_path/your_dag_folder
This time, the output table will display parse times for all Python files in the folder:
The library supports some additional arguments to customize the results. To see all available options, run:
airflow-parse-bench --help
It will display the following options:
- --path: The path to the Python file or directory containing the DAGs.
- --order: The order in which the results are displayed. You can choose between 'asc' (ascending) or 'desc' (descending).
- --num-iterations: The number of times to parse each DAG. The parse time will be averaged across iterations.
- --skip-unchanged: Skip DAGs that haven't changed since the last run.
- --reset-db: Clear all stored data in the local database, starting a fresh execution.
Note: If a Python file has parsing errors or contains no valid DAGs, it will be excluded from the results table, and an error message will be displayed.
This project is still in its early stages, and there are many improvements planned for the future. Some of the features we're considering include:
- Cloud DAG Parsing: Automatically download and parse DAGs from cloud providers like AWS S3 or Google Cloud Storage.
- CI/CD Integration: Adapt the tool to work with CI/CD pipelines.
- Parallel Parsing: Speed up processing by parsing multiple DAGs simultaneously.
- Support .airflowignore: Ignore files and directories specified in the
.airflowignore
file.
If you’d like to suggest a feature or report a bug, please open a new issue!
This project is open to contributions! If you want to collaborate to improve the tool, please follow these steps:
- Open a new issue to discuss the feature or bug you want to address.
- Once approved, fork the repository and create a new branch.
- Implement the changes.
- Create a pull request with a detailed description of the changes.