Stijnvandenb
Stijnvandenb

Reputation: 31

How to run tasks sequentially in a loop in an Airflow DAG?

Say I have this snippet of an example DAG:

variables = ['first', 'second', 'third']

def run_dag_task(variable):
  task = dag_task(variable)
  return task

for variable in variables:
  run_dag_task(variable)

This code would run the three tasks for the three variables in parallel. Now, say I want the first task to finish before commencing the second task, and to finish the second task before commencing the third task. How would I go about doing this with a loop? Is that possible?

Upvotes: 3

Views: 30222

Answers (2)

joss
joss

Reputation: 765

variables = ['first', 'second', 'third']

def run_dag_task(variable):
  task = dag_task(variable)
  return task
task_arr=[]
task_arr.append(run_dag_task(variable[0]))

for variable in variables[1:]:
  task=run_dag_task(variable)
  task_arr[-1]>>task
  task_arr.append(task)

This should do what you need. Add tasks on to an array and setup the relationship

Upvotes: 6

Bas Harenslak
Bas Harenslak

Reputation: 3034

In Airflow, you can define order between tasks using >>. For example:

task1 >> task2

Which would run task1 first, wait for it to complete, and only then run task2. This also allows passing a list:

task1 >> [task2, task3]

Will would run task1 first, again wait for it to complete, and then run tasks task2 and task3.

Your example could be written as:

start = 'first'
variables = ['second', 'third']

def run_dag_task(variable):
  task = dag_task(variable)
  return task

start_task = run_dag_task(start)
next_tasks = [run_dag_task(var) for var in variables]
start_task >> next_tasks

Upvotes: 5

Related Questions