Reputation: 591
Currently working on setting up alerts for long running tasks in Airflow. To cancel/fail the airflow dag I've put "dagrun_timeout" in the default_args, and it does what I need, fails/errors the dag when its been running for too long (usually stuck). The only problem is that the function in "on_failure_callback" doesn't get called when the dagrun_timeout is exceeded, because the "on_failure_callback" is on the task level (I think) while the dagrun_timeout is on the dag level.
How can I execute the "on_failure_callback" when the dagrun_timeout is exceeded, or how can I specify a function to be called when a dag fails? Or should I re-think my approach?
Upvotes: 7
Views: 13946
Reputation: 4853
Try setting on_failure_callback
during DAG declaration:
with DAG(
dag_id="failure_callback_example",
on_failure_callback=_on_dag_run_fail,
...
) as dag:
...
The explanation is that on_failure_callback
defined in default_args
will get passed only to the Tasks being created and not to the DAG object.
Here is an example to try this behaviour:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.bash import BashOperator
def _on_dag_run_fail(context):
print("***DAG failed!! do something***")
print(f"The DAG failed because: {context['reason']}")
print(context)
def _alarm(context):
print("** Alarm Alarm!! **")
task_instance: TaskInstance = context.get("task_instance")
print(f"Task Instance: {task_instance} failed!")
default_args = {
"owner": "mi_empresa",
"email_on_failure": False,
"on_failure_callback": _alarm,
}
with DAG(
dag_id="failure_callback_example",
start_date=datetime(2021, 9, 7),
schedule_interval=None,
default_args=default_args,
catchup=False,
on_failure_callback=_on_dag_run_fail,
dagrun_timeout=timedelta(seconds=45),
) as dag:
delayed = BashOperator(
task_id="delayed",
bash_command='echo "waiting..";sleep 60; echo "Done!!"',
)
will_fail = BashOperator(
task_id="will_fail",
bash_command="exit 1",
# on_failure_callback=_alarm,
)
delayed >> will_fail
You can find the logs of the callbacks execution in the Scheduler logs AIRFLOW_HOME/logs/scheduler/date/failure_callback_example
:
[2021-09-24 13:12:34,285] {logging_mixin.py:104} INFO - [2021-09-24 13:12:34,285] {dag.py:862} INFO - Executing dag callback function: <function _on_dag_run_fail at 0x7f83102e8670>
[2021-09-24 13:12:34,336] {logging_mixin.py:104} INFO - ***DAG failed!! do something***
[2021-09-24 13:12:34,345] {logging_mixin.py:104} INFO - The DAG failed because: timed_out
Within the context
dict the key reason
is passed in order to specify the cause of the DAG run failure. Some values are: 'reason': 'timed_out'
or 'reason': 'task_failure'
. This could be use to perfom specific behaviour in the callback based on the reason
of the DAG Run failure.
Upvotes: 6