Apache Airflow™ is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow™ is a batch workflow orchestration platform. Airflow is deployable in many ways, varying from a single process on your laptop to a distributed setup to support even the biggest workflows.
Installation
Local Deployment
-
Set Airflow Home (optional):
1export AIRFLOW_HOME=~/airflow -
Install Airflow using the constraints file, which is determined based on the URL we pass:
1 2 3 4 5 6 7 8 9AIRFLOW_VERSION=2.6.3 # Extract the version of Python you have installed. If you're currently using Python 3.11 you may want to set this manually as noted above, Python 3.11 is not yet supported. PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" # For example this would install 2.6.3 with python 3.7: https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.7.txt pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" -
Run Airflow Standalone:
The
airflow standalonecommand initializes the database, creates a user, and starts all components.1airflow standalone -
Access the Airflow UI:
Visit
localhost:8080in your browser and log in with the admin account details shown in the terminal. Enable theexample_bash_operatorDAG in the home page.
Upon running these commands, Airflow will create the $AIRFLOW_HOME folder and create the “airflow.cfg” file with defaults that will get you going fast. You can override defaults using environment variables, see Configuration Reference. You can inspect the file either in $AIRFLOW_HOME/airflow.cfg, or through the UI in the Admin->Configuration menu. The PID file for the webserver will be stored in $AIRFLOW_HOME/airflow-webserver.pid or in /run/airflow/webserver.pid if started by systemd.
Production Deployment
Tutorials
Fundamental Concepts
DAGs
A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.
Operators
An operator defines a unit of work for Airflow to complete. Using operators is the classic approach to defining work in Airflow.
- BaseOperator
- PythonOperator
- BashOperator
- KubernetesPodOperator
Tasks
To use an operator in a DAG, you have to instantiate it as a task. Tasks determine how to execute your operator’s work within the context of a DAG.
|
|
The precedence rules for a task are as follows:
- Explicitly passed arguments
- Values that exist in the
default_argsdictionary - The operator’s default value, if one exists
Templating with Jinja
|
|
Files can also be passed to the bash_command argument, like bash_command='templated_command.sh', where the file location is relative to the directory containing the pipeline file. It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.
Adding documentation
|
|
Setting up Dependencies
|
|
Pipeline definition
example:
|
|
Here you see:
- A DAG named “demo”, starting on Jan 1st 2022 and running once a day. A DAG is Airflow’s representation of a workflow.
- Two tasks, a BashOperator running a Bash script and a Python function defined using the
@taskdecorator >>between the tasks defines a dependency and controls in which order the tasks will be executed
Testing
Testing pipeline parsing
Let’s assume we are saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.
|
|
Metadata validation
|
|
Testing single task
Let’s test by running the actual task instances. A DAG run’s logical date is the start of its data interval.
|
|
Note that the airflow tasks test command runs task instances locally, outputs their log to stdout (on screen), does not bother with dependencies, and does not communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.
Testing single DAG
|
|
Backfill
Note that if you use depends_on_past=True, individual task instances will depend on the success of their previous task instance (that is, previous according to the logical date).
While depends_on_past=True causes a task instance to depend on the success of its previous task_instance, wait_for_downstream=True will cause a task instance to also wait for all task instances immediately downstream of the previous task instance to succeed.
|
|