soapho
soapho

Reputation: 31

airflow trigger_rule using ONE_FAILED cause dag failure

what i wanted to achieve is to create a task where will send notification if any-one of the task under the dag is failed. I am applying trigger rule to the task where:

batch11 = BashOperator(
task_id='Error_Buzz',
trigger_rule=TriggerRule.ONE_FAILED,
bash_command='python /home/admin/pythonwork/home/codes/notifications/dagLevel_Notification.py') ,
dag=dag,
catchup = False
)

batch>>batch11
batch1>>batch11

The problem for now is when there no other task failed, the batch11 task will not execute due to trigger_rule, which is what i wanted, but it will result the dag failure since the default trigger_rule for dag is ALL_SUCCESS. Is there a way to end the loop hole to make the dag runs successfully ?

screenshot of outcome : outcome

Upvotes: 1

Views: 4651

Answers (1)

nehiljain
nehiljain

Reputation: 689

We do something similar in our Airflow Deployment. The idea is to notify slack when a task in a dag fails. You can set a dag level configuration on_failure_callback as documented https://airflow.apache.org/code.html#airflow.models.BaseOperator

on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.

Here is an example of how I use it. if any of the task fails or succeeds airflow calls notify function and I can get notification wherever I want.

import sys
import os

from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from airflow.utils.dates import days_ago
from util.airflow_utils import AirflowUtils

schedule = timedelta(minutes=5)

args = {
  'owner': 'user',
  'start_date': days_ago(1),
  'depends_on_past': False,
  'on_failure_callback': AirflowUtils.notify_job_failure,
  'on_success_callback': AirflowUtils.notify_job_success
}

dag = DAG(
  dag_id='demo_dag',
  schedule_interval=schedule, default_args=args)

def task1():
  return 'Whatever you return gets printed in the logs!'

def task2():
  return 'cont'

task1 = PythonOperator(task_id='task1',
                       python_callable=task1,
                       dag=dag)

task2 = PythonOperator(task_id='task2',
                       python_callable=task1,
                       dag=dag)

task1 >> task2

Upvotes: 1

Related Questions