Blessy
Blessy

Reputation: 520

How to pass parameters to Airflow on_success_callback and on_failure_callback

I have implemented email alerts on success and failure using on_success_callback and on_failure_callback.

According to Airflow documentation,

a context dictionary is passed as a single parameter to this function.

How can I pass another parameter to these callback methods?

Here is my code

from airflow.utils.email import send_email_smtp

def task_success_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Success".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Succeeded on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

def task_failure_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Failed on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

I intend to move the callbacks to another package and pass the email address as parameter.

Upvotes: 10

Views: 31419

Answers (3)

Franco Piccolo
Franco Piccolo

Reputation: 7410

You can use partial to create a function with a predefined argument like:

from functools import partial
new_task_success_alert = partial(task_success_alert, email='your_email')

And then add the new function as a callback:

on_success_callback=new_task_success_alert

Upvotes: 8

kemazariegos
kemazariegos

Reputation: 41

You can create a task that its only purpose is to push configuration setting through xcoms. You can pull the configuration via context as the task_instance object is included in context.

def push_configuration(ti, params):
    ti.xcom_push(key='conn_id', value=params)

def task_success_alert(context):
    ti = context.get('ti') 
    params = ti.xcom_pull(key='params', task_ids='Settings')
    ...


step0 = PythonOperator(
        task_id='Settings',
        python_callable=push_configuration,
        op_kwargs={'params': params})

step1 = BashOperator(
        task_id='step1',
        bash_command='pwd',
        on_success_callback=task_success_alert)

Upvotes: 3

nightgaunt
nightgaunt

Reputation: 910

You could define a function inside your dag that calls the function from your package. And while calling that function, pass email as an argument. You can refine it further at your DAG level to pass only information required for the emails.

from package import outer_task_success_callback
email = '[email protected]'

def task_success_alert(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance']. task_id
    outer_task_success_callback(dag_id, task_id, email)
    
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

This will allow you to customize before you call the function in your package.

On a side note, airflow has smtp email functionality. Instead of writing your own solution, you can utilize those.

Upvotes: 11

Related Questions