Reputation: 1199
I'm trying to send individual emails to a list of users using an EmailOperator inside a python loop but currently the emails are not being sent. The dag doesn't return any error which makes me believe the EmailOperator is not being called.
Code
@dag(schedule_interval=None, tags=['Send multiple emails testing'], default_args=default_args)
def multiple_email_send_test_dag():
@task
def multiple_email_send_test():
email_list = ['[email protected]', '[email protected]']
for i in range(len(email_list)):
EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
# Dummy Operators
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
# The pipeline
start >> multiple_email_send_test() >> end
dag = multiple_email_send_test_dag()
What I'm I missing?
Upvotes: 2
Views: 4660
Reputation: 15961
This is a case of calling operator inside operator. The EmailOperator
isn't executing - all it does is just initialize the constructor of the class. The logic of actually sending the email is in the execute()
function of the class.
DON'T DO THIS - BAD PRACTICE:
Let me first clarify that if you want to make your current code work you must call the execute()
See this answer for more information about it.
@task
def multiple_email_send_test():
email_list = ['[email protected]', '[email protected]']
for i in range(len(email_list)):
e = EmailOperator(
task_id=f'send_success_email_test_no_{i}',
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
e.exectue(dict())
How you should solve it:
Option 1:
Since EmailOperator
sends 1 email to all addresses and it's different behavior than what you wish. You can just create a custom operator IndvidualEmailOpeator
that accept a list of emails and send to each address individual mail. By doing so you won't need to wrap the operator with PythonOperator
/ task decorator thus you can just use the operator you created directly.
Option 2:
Note that EmailOperator
is simply calling send_email, so you can just use the function directly without the operator thus avoiding the issue of operator inside operator:
@task
def multiple_email_send_test():
from airflow.utils.email import send_email
email_list = ['[email protected]', '[email protected]']
for i in range(len(email_list)):
send_email(
to=str(email_list[i]),
subject='Email Header',
html_content= f"""
Hi {email_list[i]}, <br>
<p>This is the body of the email</p>
<br> Thank You. <br>
"""
)
Upvotes: 5