Anton
Anton

Reputation: 591

Airflow triggering the "on_failure_callback" when the "dagrun_timeout" is exceeded

Currently working on setting up alerts for long running tasks in Airflow. To cancel/fail the airflow dag I've put "dagrun_timeout" in the default_args, and it does what I need, fails/errors the dag when its been running for too long (usually stuck). The only problem is that the function in "on_failure_callback" doesn't get called when the dagrun_timeout is exceeded, because the "on_failure_callback" is on the task level (I think) while the dagrun_timeout is on the dag level.

How can I execute the "on_failure_callback" when the dagrun_timeout is exceeded, or how can I specify a function to be called when a dag fails? Or should I re-think my approach?

Upvotes: 7

Views: 13946

Answers (1)

NicoE
NicoE

Reputation: 4853

Try setting on_failure_callback during DAG declaration:

with DAG(
    dag_id="failure_callback_example",
    on_failure_callback=_on_dag_run_fail,
    ...
) as dag:
...

The explanation is that on_failure_callback defined in default_args will get passed only to the Tasks being created and not to the DAG object.

Here is an example to try this behaviour:


from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator


def _on_dag_run_fail(context):
    print("***DAG failed!! do something***")
    print(f"The DAG failed because: {context['reason']}")
    print(context)


def _alarm(context):
    print("** Alarm Alarm!! **")
    task_instance: TaskInstance = context.get("task_instance")
    print(f"Task Instance: {task_instance} failed!")


default_args = {
    "owner": "mi_empresa",
    "email_on_failure": False,
    "on_failure_callback": _alarm,
}


with DAG(
    dag_id="failure_callback_example",
    start_date=datetime(2021, 9, 7),
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
    on_failure_callback=_on_dag_run_fail,
    dagrun_timeout=timedelta(seconds=45),
) as dag:

    delayed = BashOperator(
        task_id="delayed",
        bash_command='echo "waiting..";sleep 60; echo "Done!!"',
    )
    will_fail = BashOperator(
        task_id="will_fail",
        bash_command="exit 1",
        # on_failure_callback=_alarm,
    )
delayed >> will_fail

You can find the logs of the callbacks execution in the Scheduler logs AIRFLOW_HOME/logs/scheduler/date/failure_callback_example :

[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out

Edit:

Within the context dict the key reason is passed in order to specify the cause of the DAG run failure. Some values are: 'reason': 'timed_out' or 'reason': 'task_failure' . This could be use to perfom specific behaviour in the callback based on the reason of the DAG Run failure.

Upvotes: 6

Related Questions