JMarc
JMarc

Reputation: 1004

how to implement airflow DAG in a loop

I just started with Airflow. I want to set up a DAG in a loop, where the next DAG starts when the previous DAG is completed. Here is the work flow that I want to achieve:

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   t1.set_downstream(t2)

If I run airflow backfill pipeline -s 2019-05-01, all the DAGs are started simultaneously.

Upvotes: 3

Views: 13318

Answers (2)

Daniel R Carletti
Daniel R Carletti

Reputation: 549

Even though DAGs are separate workflows, you can use the TriggerDagRunOperator. You just need the name dag id of the next DAG, so you need to edit your loop do look ahead or just use an index or something. Easy fix. Example of the operator below:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   trigger_run_task = TriggerDagRunOperator(
       task_id="next_dag_trigger_run",
       trigger_dag_id="next_dag_id",
   )
   t1 >> t2 >> trigger_run_task

Upvotes: 1

bosnjak
bosnjak

Reputation: 8614

DAGs can't depend on each other, they are separate workflows. You want to configure tasks to depend on each other instead. You can have a single DAG with multiple execution branches, one for each file, something like this (not tested):

dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
    for file in list_of_files:
       t1 = BashOperator('copy_this_file', ....)
       t2 = BashOperator('process_this_file', ...)
       t1.set_downstream(t2)

Upvotes: 3

Related Questions