Reputation: 279
Is there any option Customize email and send on any task failure in the DAG. There is an option like 'email_on_failure': True but this doesn't provide an option to Dynamically add content to email Subject or Body.
My DAG will look like below
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
import json
from datetime import timedelta
from datetime import datetime
from airflow.models import Variable
args = {
'owner': 'airflow',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'max_active_runs':10
}
dag = DAG(dag_id='TEST_DAG', default_args=args, schedule_interval='@once')
new_cluster = {
'spark_version': '4.0.x-scala2.11',
'node_type_id': 'Standard_D16s_v3',
'num_workers': 3,
'spark_conf':{
'spark.hadoop.javax.jdo.option.ConnectionDriverName':'org.postgresql.Driver',
.....
},
'custom_tags':{
'ApplicationName':'TEST',
.....
}
}
t1 = DatabricksSubmitRunOperator(
task_id='t1',
dag=dag,
new_cluster=new_cluster,
......
)
t2 = SimpleHttpOperator(
task_id='t2',
method='POST',
........
)
t2.set_upstream(t1)
t3 = SimpleHttpOperator(
task_id='t3',
method='POST',
.....
)
t3.set_upstream(t2)
send_mail = EmailOperator (
dag=dag,
task_id="send_mail",
to=["[email protected]"],
subject=" Success",
html_content='<h3>Success</h3>')
send_mail.set_upstream(t3)
Success case send_mail task will send customized email to specified email id.
But in case any task failure , I want to customized email and send to specified email id. But this is not happening and on failure case ,email send with default subject and body
Any help would be appreciated
Upvotes: 10
Views: 26107
Reputation: 332
Currently using Airflow 1.10.1 :
Custom email option seems to be configurable in the airflow.cfg under "email" section using jinja templates like below :
[email]
email_backend = airflow.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file
Customised message can be created by making use of task-instance information in the html_content_template which is in-turn a jinja template
More details can be found at https://airflow.apache.org/docs/stable/howto/email-config.html
Upvotes: 1
Reputation: 279
I managed it with the help of Airflow TriggerRule, Sample DAG given below :-
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
import json
from datetime import timedelta
from datetime import datetime
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
args = {
'owner': 'airflow',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'max_active_runs':10
}
dag = DAG(dag_id='TEST_DAG', default_args=args, schedule_interval='@once')
new_cluster = {
'spark_version': '4.0.x-scala2.11',
'node_type_id': 'Standard_D16s_v3',
'num_workers': 3,
'spark_conf':{
'spark.hadoop.javax.jdo.option.ConnectionDriverName':'org.postgresql.Driver',
.....
},
'custom_tags':{
'ApplicationName':'TEST',
.....
}
}
t1 = DatabricksSubmitRunOperator(
task_id='t1',
dag=dag,
new_cluster=new_cluster,
......
)
t2 = SimpleHttpOperator(
task_id='t2',
trigger_rule=TriggerRule.ONE_SUCCESS,
method='POST',
........
)
t2.set_upstream(t1)
t3 = SimpleHttpOperator(
task_id='t3',
trigger_rule=TriggerRule.ONE_SUCCESS,
method='POST',
.....
)
t3.set_upstream(t2)
AllTaskSuccess = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS,
task_id="AllTaskSuccess",
to=["[email protected]"],
subject="All Task completed successfully",
html_content='<h3>All Task completed successfully" </h3>')
AllTaskSuccess.set_upstream([t1, t2,t3])
t1Failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="t1Failed",
to=["[email protected]"],
subject="T1 Failed",
html_content='<h3>T1 Failed</h3>')
t1Failed.set_upstream([t1])
t2Failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="t2Failed",
to=["[email protected]"],
subject="T2 Failed",
html_content='<h3>T2 Failed</h3>')
t2Failed.set_upstream([t2])
t3Failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="t3Failed",
to=["[email protected]"],
subject="T3 Failed",
html_content='<h3>T3 Failed</h3>')
t3Failed.set_upstream([t3])
Trigger Rules
Though the normal workflow behavior is to trigger tasks when all their directly upstream tasks have succeeded, Airflow allows for more complex dependency settings.
All operators have a trigger_rule argument which defines the rule by which the generated task get triggered. The default value for trigger_rule is all_success and can be defined as “trigger this task when all directly upstream tasks have succeeded”. All other rules described here are based on direct parent tasks and are values that can be passed to any operator while creating tasks:
all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
dummy: dependencies are just for show, trigger at will
Reference : https://airflow.apache.org/concepts.html
Upvotes: 7
Reputation: 2682
I'm using on_failure_callback
for this. Please note that it will get triggered for every failed task in a DAG.
def report_failure(context):
# include this check if you only want to get one email per DAG
if(task_instance.xcom_pull(task_ids=None, dag_id=dag_id, key=dag_id) == True):
logging.info("Other failing task has been notified.")
send_email = EmailOperator(...)
send_email.execute(context)
'''
dag = DAG(
...,
default_args={
...,
"on_failure_callback": report_failure
}
)
Upvotes: 13