Reputation: 21
I'm kinda new to DAGs, Airflow and Python Syntax(I learned coding from Java), but I have a DAG with about 10 tasks that are independent from one another and I have another DAG that can only run if all 10 tasks were sucessefull. Because the way I have it, if one task fails, the DAG still runs the other ones and the DAG is marked as successful. (Which is what I want)
Is there a way to make a new task (task 11) that goes through the other tasks and check their state? I can't find a function that returns the task's state
I was thinking something like this (let's pretend there's a state()
function)
array_of_task_ids= [task1, task2, task3,...]
for i in array_of_tasks_ids
if array_of_task_id[i].state() == Failed
#this means that if it found a task with a Failed state it would run a new dummy task indicating that one of the tasks failed
task_sensor_failed = DummyOperator(
task_id='task_sensor_failed',
dag=dag,
)
And then on the other DAG that should only run if this task 'task_sensor_failed' didn't run, I would put the Sensor
external_sensor= ExternalTaskSensor(
task_id='external_sensor',
external_dag_id='10_tasks_dag',
external_task_id='task_sensor_failed',
... )
This would not be the actual code I would use. I know is not right, I just wanted to do something simple so you understand what I'm trying to do. I don't know, maybe this is a stupid way of doing this but like I said I'm new to this so I not sure what I'm doing.
Anyway, the general idea is that I can only run a DAG if all 10 tasks of the other DAG were successful, anyone can help me to accomplish that? Sorry for the long post and thanks for the help in advance! Any suggestions?
Upvotes: 2
Views: 6365
Reputation: 11597
Once you know that
ExternalTaskSensor
can also sense an entire DAG
(instead of a specific task
of the DAG
)DAG
failed if any one of it's leaf tasks fail (in other words, Airflow marks a DAG success only if all leaf tasks succeed)you can do it without adding any dummy task in 1st DAG
Here's how
Leave the first DAG untouched
Make your 2nd DAG begin with an ExternalTaskSensor
that senses the 1st DAG (just specify external_dag_id
without specifying external_task_id
)
As an extension, if it suits your requirements, you can make your 1st DAG reactively trigger the 2nd DAG (only when all it's tasks succeeded) as follows
In your 1st DAG,
TriggerDagRunOperator
with trigger_rule=TriggerRule.ALL_SUCCESS
(default) andupstream_tasks_list >> trigger_task
trigger_dag_id='my_2nd_dag_id'
Leave your 2nd DAG untouched
Upvotes: 8