Reputation: 1829
I have an airflow task pipeline as in the diagram. task1_error_handler
& task2_error_handler
are error handling tasks which should be ran only if task directly linked is failed. I have set ONE_FAILED trigger rule for these tasks. But seems error on task1
triggers both error handlers. I only need to trigger task1_error_handler
.
All the tasks are custom operators & the task ids ending with status
are custom sensors.
How should I achieve this ?
Upvotes: 4
Views: 13089
Reputation: 46
You can catch error in task1 code, skip all branches but error handlers and reraise error. For example, if task1 is BashOperator
, replace it with the following CatchBashOperator
:
class CatchMixin(SkipMixin):
def execute(self, context: Dict):
try:
super().execute(context)
except Exception as e:
ti = context['ti']
task = ti.task
excl_rules = [TriggerRule.ONE_FAILED, TriggerRule.ALL_FAILED]
excl_tasks_ids = [t.task_id for t in task.downstream_list if t.trigger_rule in excl_rules]
self.skip_all_except(ti, excl_tasks_ids)
raise e
class CatchBashOperator(CatchMixin, BashOperator):
pass
Upvotes: 1
Reputation: 11607
Note: The propesed solution might be wrong but you would still get the idea that I'm trying to achieve what Logic Gates do in Digital Circuits which might help you come up with a working solution. I encourage you to provide feedback
@Zack's answer pin-points the problem very well. I'd just like to add a workaround I have in mind.
If we create some new dummy tasks and dependencies as follows, it might just do the trick.
A DummyOperator
with trigger_rule=ONE_FAILED
in place of task2_error_handler
. Its success means that task2
has failed (which could very well be because of failure of task1
)
from airflow.operators.dummy_operator import DummyOperator from airflow.utils.trigger_rule import TriggerRule .. task2_dummy_error_handler = DummyOperator( dag=my_dag, task_id="task2_dummy_error_handler", trigger_rule=TriggerRule.ONE_FAILED ) [task2, task2_status_check] >> task2_dummy_error_handler
Another DummyOperator
with trigger_rule=ALL_SUCCESS
that informs whether task1
had succeeded or not. Its failure would mean that task1
had failed => task2
would automatically fail because of UPSTREAM_FAILED
hence we need not run task2_retry_handler
task1_error_handler_status_check = DummyOperator( dag=my_dag, task_id="task1_error_handler_status_check", trigger_rule=TriggerRule.ALL_SUCCESS ) [task1, task1_status_check] >> task1_error_handler_status_check
Finally set trigger_rule=ALL_SUCCESS
in your task2_retry_handler
and make it downstream of above two dummy tasks. This should ensure that task2_retry_handler
runs on failure of task2
but not on failure of task1
.
task2_retry_handler = PythonOperator( dag=my_dag, task_id="task2_retry_handler", python_callable=my_task2_retry_handler, .., trigger_rule=TriggerRule.ALL_SUCCESS ) [task1_error_handler_status_check, task2_dummy_error_handler] >> task2_retry_handler
References
Upvotes: 0
Reputation: 2466
An error on task1
is causing both error handlers to occur because task2
is downstream of task1
, making task1
a parent of task task2
.
With your trigger rule being ONE_FAILED
for both task1
and task2
, this is causing problems because the the definition of ONE_FAILED
is:
fires as soon as at least one parent has failed, it does not wait for all parents to be done
So with that said, you only want the task1_error_handler
to trigger if task1
fails. This cannot be easily done by just changing the trigger rule unfortunately because you can't directly link a conditional task like you want to currently.
Your best bets would be:
task2
's error handler trigger rule and instead use on_failure_callback
to call the error handler.Upvotes: 2