Reputation: 520
I have implemented email alerts on success and failure using on_success_callback and on_failure_callback.
According to Airflow documentation,
a context dictionary is passed as a single parameter to this function.
How can I pass another parameter to these callback methods?
Here is my code
from airflow.utils.email import send_email_smtp
def task_success_alert(context):
subject = "[Airflow] DAG {0} - Task {1}: Success".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Succeeded on: {2}
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
datetime.now()
)
send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
def task_failure_alert(context):
subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Failed on: {2}
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
datetime.now()
)
send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 13),
'on_success_callback': task_success_alert,
'on_failure_callback': task_failure_alert
}
I intend to move the callbacks to another package and pass the email address as parameter.
Upvotes: 10
Views: 31419
Reputation: 7410
You can use partial to create a function with a predefined argument like:
from functools import partial
new_task_success_alert = partial(task_success_alert, email='your_email')
And then add the new function as a callback:
on_success_callback=new_task_success_alert
Upvotes: 8
Reputation: 41
You can create a task that its only purpose is to push configuration setting through xcoms
. You can pull the configuration via context
as the task_instance
object is included in context
.
def push_configuration(ti, params):
ti.xcom_push(key='conn_id', value=params)
def task_success_alert(context):
ti = context.get('ti')
params = ti.xcom_pull(key='params', task_ids='Settings')
...
step0 = PythonOperator(
task_id='Settings',
python_callable=push_configuration,
op_kwargs={'params': params})
step1 = BashOperator(
task_id='step1',
bash_command='pwd',
on_success_callback=task_success_alert)
Upvotes: 3
Reputation: 910
You could define a function inside your dag that calls the function from your package. And while calling that function, pass email as an argument. You can refine it further at your DAG level to pass only information required for the emails.
from package import outer_task_success_callback
email = '[email protected]'
def task_success_alert(context):
dag_id = context['dag'].dag_id
task_id = context['task_instance']. task_id
outer_task_success_callback(dag_id, task_id, email)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 13),
'on_success_callback': task_success_alert,
'on_failure_callback': task_failure_alert
}
This will allow you to customize before you call the function in your package.
On a side note, airflow has smtp email functionality. Instead of writing your own solution, you can utilize those.
Upvotes: 11