Bill Goldberg
Bill Goldberg

Reputation: 1829

Airflow error handling task trigger rule for triggering directly linked task failure

I have an airflow task pipeline as in the diagram. task1_error_handler & task2_error_handler are error handling tasks which should be ran only if task directly linked is failed. I have set ONE_FAILED trigger rule for these tasks. But seems error on task1 triggers both error handlers. I only need to trigger task1_error_handler. All the tasks are custom operators & the task ids ending with status are custom sensors. How should I achieve this ?enter image description here

Upvotes: 4

Views: 13089

Answers (3)

Oleg Orlov
Oleg Orlov

Reputation: 46

You can catch error in task1 code, skip all branches but error handlers and reraise error. For example, if task1 is BashOperator, replace it with the following CatchBashOperator:

class CatchMixin(SkipMixin):

    def execute(self, context: Dict):
        try:
            super().execute(context)
        except Exception as e:
            ti = context['ti']
            task = ti.task
            excl_rules = [TriggerRule.ONE_FAILED, TriggerRule.ALL_FAILED]
            excl_tasks_ids = [t.task_id for t in task.downstream_list if t.trigger_rule in excl_rules]
            self.skip_all_except(ti, excl_tasks_ids)
            raise e


class CatchBashOperator(CatchMixin, BashOperator):
    pass

Upvotes: 1

y2k-shubham
y2k-shubham

Reputation: 11607

Note: The propesed solution might be wrong but you would still get the idea that I'm trying to achieve what Logic Gates do in Digital Circuits which might help you come up with a working solution. I encourage you to provide feedback


@Zack's answer pin-points the problem very well. I'd just like to add a workaround I have in mind.

If we create some new dummy tasks and dependencies as follows, it might just do the trick.

  1. A DummyOperator with trigger_rule=ONE_FAILED in place of task2_error_handler. Its success means that task2 has failed (which could very well be because of failure of task1)

    from airflow.operators.dummy_operator import DummyOperator
    from airflow.utils.trigger_rule import TriggerRule
    ..
    task2_dummy_error_handler = DummyOperator(
                                  dag=my_dag,
                                  task_id="task2_dummy_error_handler",
                                  trigger_rule=TriggerRule.ONE_FAILED
                                )
    [task2, task2_status_check] >> task2_dummy_error_handler
    
  2. Another DummyOperator with trigger_rule=ALL_SUCCESS that informs whether task1 had succeeded or not. Its failure would mean that task1 had failed => task2 would automatically fail because of UPSTREAM_FAILED hence we need not run task2_retry_handler

    task1_error_handler_status_check = DummyOperator(
                                         dag=my_dag,
                                         task_id="task1_error_handler_status_check",
                                         trigger_rule=TriggerRule.ALL_SUCCESS
                                       )
    [task1, task1_status_check] >> task1_error_handler_status_check
    
  3. Finally set trigger_rule=ALL_SUCCESS in your task2_retry_handler and make it downstream of above two dummy tasks. This should ensure that task2_retry_handler runs on failure of task2 but not on failure of task1.

    task2_retry_handler = PythonOperator(
                            dag=my_dag,
                            task_id="task2_retry_handler",
                            python_callable=my_task2_retry_handler,
                            ..,
                            trigger_rule=TriggerRule.ALL_SUCCESS
                          )
    [task1_error_handler_status_check, task2_dummy_error_handler] >> task2_retry_handler
    

References

Upvotes: 0

Zack
Zack

Reputation: 2466

An error on task1 is causing both error handlers to occur because task2 is downstream of task1, making task1 a parent of task task2.

With your trigger rule being ONE_FAILED for both task1 and task2, this is causing problems because the the definition of ONE_FAILED is:

fires as soon as at least one parent has failed, it does not wait for all parents to be done

So with that said, you only want the task1_error_handler to trigger if task1 fails. This cannot be easily done by just changing the trigger rule unfortunately because you can't directly link a conditional task like you want to currently.

Your best bets would be:

  • Keep task1 as it is and to get rid of task2's error handler trigger rule and instead use on_failure_callback to call the error handler.
  • Split task2 into a separate DAG.

Upvotes: 2

Related Questions