Victor Racy
Victor Racy

Reputation: 21

Airflow - Only run a DAG if all tasks on another DAG were successfull

I'm kinda new to DAGs, Airflow and Python Syntax(I learned coding from Java), but I have a DAG with about 10 tasks that are independent from one another and I have another DAG that can only run if all 10 tasks were sucessefull. Because the way I have it, if one task fails, the DAG still runs the other ones and the DAG is marked as successful. (Which is what I want)

Is there a way to make a new task (task 11) that goes through the other tasks and check their state? I can't find a function that returns the task's state

I was thinking something like this (let's pretend there's a state() function)

array_of_task_ids= [task1, task2, task3,...]
for i in array_of_tasks_ids
if array_of_task_id[i].state() == Failed 

#this means that if it found a task with a Failed state it would run a new dummy task indicating that one of the tasks failed

 task_sensor_failed = DummyOperator(
 task_id='task_sensor_failed',
 dag=dag,
 )

And then on the other DAG that should only run if this task 'task_sensor_failed' didn't run, I would put the Sensor

external_sensor= ExternalTaskSensor(
task_id='external_sensor',
external_dag_id='10_tasks_dag',
external_task_id='task_sensor_failed',

... )

This would not be the actual code I would use. I know is not right, I just wanted to do something simple so you understand what I'm trying to do. I don't know, maybe this is a stupid way of doing this but like I said I'm new to this so I not sure what I'm doing.

Anyway, the general idea is that I can only run a DAG if all 10 tasks of the other DAG were successful, anyone can help me to accomplish that? Sorry for the long post and thanks for the help in advance! Any suggestions?

Upvotes: 2

Views: 6365

Answers (1)

y2k-shubham
y2k-shubham

Reputation: 11597

Once you know that

  • ExternalTaskSensor can also sense an entire DAG (instead of a specific task of the DAG)
  • Airflow marks a DAG failed if any one of it's leaf tasks fail (in other words, Airflow marks a DAG success only if all leaf tasks succeed)

you can do it without adding any dummy task in 1st DAG


Here's how

  1. Leave the first DAG untouched

  2. Make your 2nd DAG begin with an ExternalTaskSensor that senses the 1st DAG (just specify external_dag_id without specifying external_task_id)

  • This will continue to mark your 1st DAG failed if any one of it's tasks fail
  • but will still let the 2nd DAG run if all tasks of 1st DAG succeeded (that is 1st DAG succeeded)

As an extension, if it suits your requirements, you can make your 1st DAG reactively trigger the 2nd DAG (only when all it's tasks succeeded) as follows

  1. In your 1st DAG,

  2. Leave your 2nd DAG untouched

Upvotes: 8

Related Questions