Reputation: 7526
I am new to Airflow and have a few basic questions about how to properly run some tasks concurrently and others sequentially within one DAG.
In my DAG, the basic steps are: refresh data, run 3 separate scripts, deploy. Each of these applications is run in a separate Docker container.
In the example below, everything is done in sequence, however, my objective is to refresh data, then do this, that, and the_other_thing in parallel, then deploy.
refresh >> [this, that, the_other_thing] >> deploy
I would only like to deploy after [this, that, the_other_thing]
are finished, but it is unclear which of the three will finish last. What is the best practice to execute this sequence within one DAG? Is it enough to set concurrency=3
and to execute the [this, that, the_other_thing]
in a for loop? Any suggestions are appreciated
from builtins import range
from datetime import timedelta, datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
image = 'myserver.com:8080/my_project:latest'
args = {
'owner': 'Airflow',
'start_date': datetime(2020,01,01),
'depends_on_past': False,
"retries": 2,
'retry_delay': timedelta(minutes=5)
}
conn_foo_db = BaseHook.get_connection('foobar_db')
conn_docker = BaseHook.get_connection('my_registry')
dag = DAG(
dag_id='analysis',
default_args=args,
schedule_interval='0 3 * * *',
dagrun_timeout=timedelta(minutes=180),
max_active_runs=1,
concurrency=1,
tags=['daily']
)
refresh_data = BashOperator(
task_id='refresh_data',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=refresh',
dag=dag,
)
this = BashOperator(
task_id='run_app_this',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_this ',
dag=dag,
)
that = BashOperator(
task_id='run_app_that',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_that',
dag=dag,
)
the_other_thing = BashOperator(
task_id='run_app_the_other_thing',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=do_the_other_thing ',
dag=dag,
)
deploy = BashOperator(
task_id='deploy',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=deploy ',
dag=dag,
)
refresh_data >> run_app_this >> run_app_that >> run_app_the_other_thing >> deploy_to_dashboard
if __name__ == "__main__":
dag.cli()
Upvotes: 2
Views: 1493
Reputation: 16099
Yes, your assumption is correct. A possible code can be:
tasks_list = ["this", "that", "the_other_thing"]
refresh_data = BashOperator(
task_id='refresh_data_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=refresh',
dag=dag,
)
deploy = BashOperator(
task_id='deploy_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{ conn_foo_db.password }' "
f' { image } '
'app=deploy ',
dag=dag,
)
for task in tasks_list:
task_op = BashOperator(
task_id=f'run_{task}_task',
bash_command='docker run '
'-i --rm '
f"-e DB_PASSWORD='{conn_foo_db.password}' "
f' {image} '
f'app=do_{task}',
dag=dag,
)
refresh_data >> task_op >> deploy
Since the default trigger rule is ALL_SUCESS
the deploy
will start running only after all tasks in tasks_list
are successful.
A few Notes:
min_file_process_interval
) this will result in high volume on the DB.Upvotes: 2