Seth
Seth

Reputation: 31

Airflow: How to only send email alerts when it is not a consecutive failure?

I've an airflow dag that executes 10 tasks (exporting different data from the same source) in parallel, every 15min. I've also enabled 'email_on_failure' to get notified on failures.

Once every month or so, the tasks start failing for a couple of hours due to the data source not being available. Causing airflow to generate hundreds of emails (10 emails every 15min.), until the raw data source is available again.

Is there a better way to avoid being spammed with emails once consecutive runs fail to succeed? For example, is it possible to only send an email on failure once it is the first run that start failing (i.e. previous run was successful)?

Upvotes: 0

Views: 2956

Answers (2)

Oluwafemi Sule
Oluwafemi Sule

Reputation: 38952

Giving consideration that your tasks are running concurrently and one or multiple failures could occur, I would suggest to treat the dispatch of failure messages as one would a shared resource.

You need to implement a lock that is "dagrun-aware" –– one that knows about the DagRun.

You can back this lock using an in-memory database like Redis, an object store like S3, system file, or a database. How you choose to implement this up to you.

In your on_failure_callback implementation, you must acquire said Lock. If acquisition of said Lock is successful, carry on to dispatch the email. Otherwise, pass.

from airflow.providers.amazon.aws.hooks.s3 import S3Hook


class OnlyOnceLock:

    def __init__(self, run_id):
        self.run_id = run_id

    def acquire(self):
        # Returns False if run_id already exists in a backing store.
        # S3 example
        hook = S3Hook()
        key = self.run_id
        bucket_name = 'coordinated-email-alerts'
        try:
            hook.head_object(key, bucket_name)
            return False
        except:
            # This is the first time lock is acquired
            hook.load_string('fakie', key, bucket_name)
            return True

    def __enter__(self):
        return self.acquire()

    def __exit__(self, exc_type, exc_val, exc_tb):  
        pass
def on_failure_callback(context):
    error = context['exception']
    task = context['task']
    run_id = context['run_id']
    ti = context['ti']
    with OnlyOnceLock(run_id) as lock:
        if lock:
            ti.email_alert(error, task)
          

Upvotes: 1

Daniel T
Daniel T

Reputation: 621

To customise the logic in callbacks you can use on_failure_callback and define a python function to call on failure/success. in this function you can access the task instance.

A property on this task instance is try_number - which you can check before sending an alert. An example could be:

some_operator = BashOperator(
    task_id="some_operator",
    bash_command="""
        echo "something"
    """,
    on_failure_callback=task_fail_email_alert,
    dag=dag,


def task_fail_email_alert(context):
    try_number = context["ti"].try_number
    if try_number == 1:
        # send alert
    else:
        # do nothing

You can them implement the code to send an email in this function, rather than use the builtin email_on_failure. The EmailOperator is available by importing from airflow.operators.email import EmailOperator.

Upvotes: 3

Related Questions