Manu Bhogal
Manu Bhogal

Reputation: 75

Implementing branching in Airflow

I am trying to add alerts to my airflow dags. The dags have multiple tasks, some upto 15. I want to execute a bash script ( a general script for all dags ), in case any task at any point fails. An example, a dag has tasks T1 to T5, as T1 >> T2 >> T3 >> T4 >> T5. I want to trigger task A ( representing alerts ) in case any of these fail.

Would be really helpful in anyone can help me with the task hierarchy.

Upvotes: 1

Views: 808

Answers (1)

Mike Taylor
Mike Taylor

Reputation: 699

You have two options IMO. Failure callback and Trigger Rules

Success / Failure Callback

Airflow Task Instances have a concept of what to do in case of failure or success. These are callbacks that will be run in the case of a Task reaching a specific state... here are your options:

...
    on_failure_callback=None,
    on_success_callback=None,
    on_retry_callback=None
...

Trigger Rules

Airflow Task Instances have a concept of what state of their upstream to trigger on with the default being ALL_SUCCESS. That means your main branch can stay as it is. And you can branch where you want with A from T1 as:

from airflow.utils.trigger_rule import TriggerRule

T1 >> DummyOperator(
    dag=dag,
    task_id="task_a",
    trigger_rule=TriggerRule.ALL_FAILED
)

Alternatively, you can build your branch and include A as:

from airflow.utils.trigger_rule import TriggerRule

[T1, T2, T3, ...] >> DummyOperator(
    dag=dag,
    task_id="task_a",
    trigger_rule=TriggerRule.ONE_FAILED
)

Upvotes: 2

Related Questions