Reputation:
I wish to send a email with information of the output from my airflow DAG on success. My two approach where first to execute a function from the main DAG (which I could't do as is seems the DAG is unnable to access outputs of what it runs, which I find logical) and the second more promising, to configure it to send the log of the DAG run. I created it following this idea in airflow, but while it sends emails, they're blank. I tried finding the log and creating a list to pass as a message as follows:
def task_success_callback(context):
outer_task_success_callback(context, email='[email protected]')
def outer_task_success_callback(context, email):
lines = []
for file in glob.glob("AIRFLOW_HOME/*.log"):
with open(file) as f:
lines = [line for line in f.readlines()]
print(lines)
mensaje = lines
subject = "[Airflow] DAG {0} - Task {1}: Success".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1]
)
html_content = """
DAG: {0}<br>
Task: {1}<br>
Log: {2}<br>
""".format(
context['task_instance_key_str'].split('__')[0],
context['task_instance_key_str'].split('__')[1],
mensaje
)
It didnt seem to produce anything. I did not even return an error. Even weirder, now in the airflow log it doesnt even refer to the event "email send" which it used to do
Upvotes: 0
Views: 2057
Reputation: 1313
First: pass messages To send mail about DAG running info, you need two core airflow components to support you. They are XCOM and Python operator =>provide_context
This is a subtle but very important point: in general, if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom that is described in the section XComs https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom
– if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.
Second: run email task after task successed. There are many way. One is depended on the DAG level. Another is depended on the task level. You can double check about your logic
Upvotes: 1