Apache Airflow™ is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow™ is a batch workflow orchestration platform. Airflow is deployable in many ways, varying from a single process on your laptop to a distributed setup to support even the biggest workflows.

Installation

Local Deployment

  1. Set Airflow Home (optional):

    1
    
    export AIRFLOW_HOME=~/airflow
    
  2. Install Airflow using the constraints file, which is determined based on the URL we pass:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    AIRFLOW_VERSION=2.6.3
    
    # Extract the version of Python you have installed. If you're currently using Python 3.11 you may want to set this manually as noted above, Python 3.11 is not yet supported.
    PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
    
    CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
    # For example this would install 2.6.3 with python 3.7: https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.7.txt
    
    pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
    
  3. Run Airflow Standalone:

    The airflow standalone command initializes the database, creates a user, and starts all components.

    1
    
    airflow standalone
    
  4. Access the Airflow UI:

    Visit localhost:8080 in your browser and log in with the admin account details shown in the terminal. Enable the example_bash_operator DAG in the home page.

Upon running these commands, Airflow will create the $AIRFLOW_HOME folder and create the “airflow.cfg” file with defaults that will get you going fast. You can override defaults using environment variables, see Configuration Reference. You can inspect the file either in $AIRFLOW_HOME/airflow.cfg, or through the UI in the Admin->Configuration menu. The PID file for the webserver will be stored in $AIRFLOW_HOME/airflow-webserver.pid or in /run/airflow/webserver.pid if started by systemd.

Production Deployment

Tutorials

Fundamental Concepts

DAGs

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.

Operators

An operator defines a unit of work for Airflow to complete. Using operators is the classic approach to defining work in Airflow.

  • BaseOperator
    • PythonOperator
    • BashOperator
    • KubernetesPodOperator

Tasks

To use an operator in a DAG, you have to instantiate it as a task. Tasks determine how to execute your operator’s work within the context of a DAG.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)

The precedence rules for a task are as follows:

  1. Explicitly passed arguments
  2. Values that exist in the default_args dictionary
  3. The operator’s default value, if one exists

Templating with Jinja

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
templated_command = dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id="templated",
    depends_on_past=False,
    bash_command=templated_command,
)

Files can also be passed to the bash_command argument, like bash_command='templated_command.sh', where the file location is relative to the directory containing the pipeline file. It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.

Adding documentation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
t1.doc_md = dedent(
    """\
#### Task Documentation
...
"""
)
dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
"""  # otherwise, type it like this

Setting up Dependencies

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

Pipeline definition

example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from datetime import datetime

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

from airflow.decorators import task

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(
	dag_id="demo", 
	# These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
    	"depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
	description="A simple tutorial DAG",
	start_date=datetime(2022, 1, 1), 
	schedule="0 0 * * *",
	catchup=False,
    tags=["example"],
) as dag:

    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()

Here you see:

  • A DAG named “demo”, starting on Jan 1st 2022 and running once a day. A DAG is Airflow’s representation of a workflow.
  • Two tasks, a BashOperator running a Bash script and a Python function defined using the @task decorator
  • >> between the tasks defines a dependency and controls in which order the tasks will be executed

Testing

Testing pipeline parsing

Let’s assume we are saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.

1
python ~/airflow/dags/tutorial.py
Metadata validation
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# initialize the database tables
airflow db init

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree
Testing single task

Let’s test by running the actual task instances. A DAG run’s logical date is the start of its data interval.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# command layout: command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01

# testing templated
airflow tasks test tutorial templated 2015-06-01

Note that the airflow tasks test command runs task instances locally, outputs their log to stdout (on screen), does not bother with dependencies, and does not communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.

Testing single DAG
1
2
3
# command layout: command subcommand [dag_id] [task_id] [(optional) date]

airflow dags test
Backfill

Note that if you use depends_on_past=True, individual task instances will depend on the success of their previous task instance (that is, previous according to the logical date).

While depends_on_past=True causes a task instance to depend on the success of its previous task_instance, wait_for_downstream=True will cause a task instance to also wait for all task instances immediately downstream of the previous task instance to succeed.

1
2
3
4
5
6
7
# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow dags backfill tutorial \
    --start-date 2015-06-01 \
    --end-date 2015-06-07

Working with TaskFlow

Building a Running Pipeline

Reference

Airflow