Reputation: 133
I have 4 tasks as shown below. I want Task D to be triggered even if Task C has Failed or Succeeded. However, Task C or Task D should not be triggered if Task A or Task B has failed.
I tried to use trigger rule = all_done for Task D but if Task B fails, it triggers Task D as well.
Is there a way to accomplish this in Airflow?
Upvotes: 1
Views: 1835
Reputation: 2408
In your case, B is the critical task, and C is non-critical, but you want it to at least make an attempt before D.
First you need to remove all the trigger rules you have applied.
You currently have all_done
on C, which means that C runs even when B fails -- which you don't want.
Next you need to add a dependency between B and D:
task_b >> task_d
Now B and C are each independently upstream of D.
So what remains are two problems:
You can't do one_success
because the important one is B and it's not enough if C alone succeeds.
What you need is "B success and C done".
A relatively clean way to do this is to make C "skip" instead of fail if an error is encountered.
Here's an example of how to do that:
class MySkippingDummyOperator(DummyOperator):
def execute(self, context):
try:
super().execute(context)
except Exception as e:
raise AirflowSkipException(f'skipping instead of failing.')
If MySkippingDummyOperator encounters an error, the task will end in skipped state.
So B is success / fail, and C is success / skip. With this behavior we can use trigger rule none_failed
on task D.
none_failed
means everything completed and nothing failed.
And this should produce the desired behavior:
Alternatively, you could let D use all_done
, and then from within D retrieve the task instance state of B and then skip D if B failed. But this is more complicated and certainly more of a hack.
Upvotes: 1