Bill
Bill

Reputation: 51

Get task_id of failed task from within an Airflow DAG level failure callback

I have an airflow DAG with multiple tasks and I have defined success and failure callbacks so that they send a Microsoft Teams message to a channel with some information about the success or failure.

In the failure callback I am trying to catch the name of the task_id (or one of the task_ids) that has failed. My Dag is defined as:

with DAG(
    dag_id="my_dag",
    start_date=datetime(2022, 2, 9),
    schedule_interval="0 9-18 * * *",
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=60),
    on_failure_callback=fail_notifier,
    catchup=False
) as dag:

    get_params = PythonOperator(
        task_id="task_1",
        python_callable=get_runtime_params
    )

    final_task = PythonOperator(
        task_id="final_task",
        python_callable=run_final_task,
        trigger_rule="none_failed_or_skipped",
        on_success_callback=success_notifier
    )

    for _id, name in my_dict.items():

        with TaskGroup(group_id=name) as tg:

            calculation = PythonOperator(
                task_id="calculation_task",
                python_callable="calculation_task",
                op_kwargs={'my_id': _id}
            )

            calc_perf

        chain(get_params, tg, final_task)

so you can see that the on_faiure_callback is defined at the DAG level.

In my fail_notifier definition (I won't post the full code as it's quite lengthy) I've tried a few different ways to get the failed task instance of the DAG. However, when it runs, the behaviour is not what I am expecting. In all cases it picks a random task_id from within the DAG.

Attempt 1: Using the task_instance object task_context gets passed to the callback methods so I tried the following:

task_instance = task_context['ti']
task_id = task_instance.task_id

Attempt 2: Using the task_instance_key_str the task_instance_key_str is a string defined in the docs here my idea here was to parse the task_id from the task_instance_key_str using some regex e.g.:

key_str = task_context.get('task_instance_key_str')
task_id = _parse_task_id_from_key_str(key_str)

Again, this attempt seemed to produce random results for which task_id was returned.

Attempt 3: get task instances from the dag run and filter for failed

dag_run = context.get('dag_run')
failed_tis = dag_run.get_task_instances(state='failed')
failed_task_ids = [ti.task_id for ti in failed_tis]
first_task_id = failed_task_ids[:1] or None

In this case the callback doesn't run at all. Coupled with this, nothing appears in the scheduler logs to give any indication why the callback is failing in this case based on this answer it seems any exceptions raise in the dag level callback should appear in the scheduler logs. The code definitely works at task level as I have tested printing the list of tasks to the task log.

I am using an Astronomer distribution of airflow. Astro version is 2.1.1.post6 and Airflow version is 2.1.1.

I had considered adding the failure callback at the task level but this is infeasible, since my users would be spammed with multiple messages if more than one task failed (because in the example above multiple tasks run concurrently).

I have been scratching my head with this problem for a few weeks, and exhausted all useful resources I could find online and on SO. Any help would be greatly appreciated

Upvotes: 2

Views: 6750

Answers (1)

Hussein Awala
Hussein Awala

Reputation: 5100

When airflow dag run fails because of a task, it should call the failure callback with the context of the failed task, so this code will be enough:

def fail_notifier(context):
    failed_task = context['task_instance_key_str'] # {dag_id}__{task_id}__{ds_nodash}
    ... # send your teams message

or using the attribute task_id from task instance

failed_task = context['task'].task_id

But if your callback is called with a context for a task different from the failed task (In all cases it picks a random task_id from within the DAG), it would be a bug in airflow, so you can create a git issue and provide the log to find the bug and fix it.

Upvotes: 2

Related Questions