iskandarblue
iskandarblue

Reputation: 7526

Executing sequential and concurrent tasks within one DAG

I am new to Airflow and have a few basic questions about how to properly run some tasks concurrently and others sequentially within one DAG.

In my DAG, the basic steps are: refresh data, run 3 separate scripts, deploy. Each of these applications is run in a separate Docker container.

In the example below, everything is done in sequence, however, my objective is to refresh data, then do this, that, and the_other_thing in parallel, then deploy.

refresh >> [this, that, the_other_thing] >> deploy

I would only like to deploy after [this, that, the_other_thing] are finished, but it is unclear which of the three will finish last. What is the best practice to execute this sequence within one DAG? Is it enough to set concurrency=3 and to execute the [this, that, the_other_thing] in a for loop? Any suggestions are appreciated

from builtins import range
from datetime import timedelta, datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook

image = 'myserver.com:8080/my_project:latest'

args = {
    'owner': 'Airflow',
    'start_date': datetime(2020,01,01),
    'depends_on_past': False,
    "retries": 2,
    'retry_delay': timedelta(minutes=5)
}

conn_foo_db = BaseHook.get_connection('foobar_db')
conn_docker = BaseHook.get_connection('my_registry')

dag = DAG(
    dag_id='analysis',
    default_args=args,
    schedule_interval='0 3 * * *',
    dagrun_timeout=timedelta(minutes=180),
    max_active_runs=1,
    concurrency=1,
    tags=['daily']
)

refresh_data = BashOperator(
    task_id='refresh_data',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=refresh',
    dag=dag,
)

this = BashOperator(
    task_id='run_app_this',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_this ',
    dag=dag,
)

that = BashOperator(
    task_id='run_app_that',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_that',
    dag=dag,
)

the_other_thing = BashOperator(
    task_id='run_app_the_other_thing',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_the_other_thing ',
    dag=dag,
)

deploy = BashOperator(
    task_id='deploy',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=deploy ',
    dag=dag,
)

refresh_data >> run_app_this >> run_app_that >> run_app_the_other_thing >> deploy_to_dashboard

if __name__ == "__main__":
    dag.cli()

Upvotes: 2

Views: 1493

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16099

Yes, your assumption is correct. A possible code can be:

tasks_list = ["this", "that", "the_other_thing"]

refresh_data = BashOperator(
    task_id='refresh_data_task',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=refresh',
    dag=dag,
)

deploy = BashOperator(
    task_id='deploy_task',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=deploy ',
    dag=dag,
)

for task in tasks_list:
    task_op = BashOperator(
        task_id=f'run_{task}_task',
        bash_command='docker run '
                     '-i --rm '
                     f"-e DB_PASSWORD='{conn_foo_db.password}' "
                     f' {image}  '
                     f'app=do_{task}',
        dag=dag,
    )
    refresh_data >> task_op >> deploy

Since the default trigger rule is ALL_SUCESS the deploy will start running only after all tasks in tasks_list are successful.

A few Notes:

  1. You use the same code multiple times, you might want to consider creating some kind of configuration file that contains the dependencies and all the information needed to setup your operator and then use factory method that construct your operator from that file thus avoiding duplicating code in your DAG file.
  2. Avoid accessing connections that are stored in Airflow metastore outside of operators scope. This is a bad practice. Airflow scans your DAG file periodically (according to min_file_process_interval ) this will result in high volume on the DB.

Upvotes: 2

Related Questions