Reputation: 2055
I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:
i tried:
trigger_rule=TriggerRule.ONE_FAILED
task 4 stay on skipped state, with all_done too
trigger_rule=TriggerRule.ALL_DONE
I foud this solution: How to create a conditional task in Airflow but it doesn't work for me.
Upvotes: 3
Views: 5142
Reputation: 5415
I think that you would want to terminate the cluster no matter whether the previous tasks are successful or not, so ALL_DONE
sounds appropriate. Apart from Start_Cluster. If that fails there might not be a cluster to terminate, although you might need to check/try just in case.
The default trigger_rule is ALL_SUCCESS
so, for example, if task 1 fails, the whole Dag will fail, because task 2 requires task 1 to be successful in order to run.
If it's possible for any of the tasks to fail, but you still want to terminate the cluster, you will need some alternate pathway for the dag to follow, e.g. using the PythonBranchOperator
, and a Python callback function.
Another possibility is to just use a dummy operator that runs with a trigger_rule of "ONE_FAILURE" and then runs the Terminate Cluster task.
For example, if you named the dummy task "Task_Failure" this would be the dependency chain:
Start_Cluster >> Task_2 >> Task_3 >> Terminate_Cluster
Task_2 >> Task_Failure
Task_3 >> Task_Failure
Task_Failure >> Terminate_Cluster
In that scenario, Task_Failure would probably have to set the Terminate_Cluster trigger_rule to ONE_SUCCESS
, because it's possible some tasks never run.If you had set the final task toALL_DONE
and some of the previous tasks don't have statuses, it might just hang or possibly fail.
The difference between ALL_DONE and ALL_SUCCESS: https://stackoverflow.com/a/47716981/1335793
Upvotes: 4