Shadow Walker
Shadow Walker

Reputation: 1199

How to send multiple emails using EmailOperator in Airflow

I'm trying to send individual emails to a list of users using an EmailOperator inside a python loop but currently the emails are not being sent. The dag doesn't return any error which makes me believe the EmailOperator is not being called.

Code

@dag(schedule_interval=None, tags=['Send multiple emails testing'], default_args=default_args)
def multiple_email_send_test_dag():

    @task   
    def multiple_email_send_test():
        email_list = ['[email protected]', '[email protected]']
        for i in range(len(email_list)):
            EmailOperator(
                task_id=f'send_success_email_test_no_{i}', 
                to=str(email_list[i]),
                subject='Email Header',
                html_content= f"""
                        Hi {email_list[i]}, <br>
                        <p>This is the body of the email</p>
                        <br> Thank You. <br>
                    """
            ) 
    
    # Dummy Operators
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')

    # The pipeline
    start >> multiple_email_send_test() >> end

dag = multiple_email_send_test_dag()

What I'm I missing?

Upvotes: 2

Views: 4660

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15961

This is a case of calling operator inside operator. The EmailOperator isn't executing - all it does is just initialize the constructor of the class. The logic of actually sending the email is in the execute() function of the class.


DON'T DO THIS - BAD PRACTICE:

Let me first clarify that if you want to make your current code work you must call the execute() See this answer for more information about it.

@task   
def multiple_email_send_test():
    email_list = ['[email protected]', '[email protected]']
    for i in range(len(email_list)):
        e = EmailOperator(
            task_id=f'send_success_email_test_no_{i}', 
            to=str(email_list[i]),
            subject='Email Header',
            html_content= f"""
                    Hi {email_list[i]}, <br>
                    <p>This is the body of the email</p>
                    <br> Thank You. <br>
                """
        ) 
        e.exectue(dict())

How you should solve it:

Option 1: Since EmailOperator sends 1 email to all addresses and it's different behavior than what you wish. You can just create a custom operator IndvidualEmailOpeator that accept a list of emails and send to each address individual mail. By doing so you won't need to wrap the operator with PythonOperator / task decorator thus you can just use the operator you created directly.

Option 2:

Note that EmailOperator is simply calling send_email, so you can just use the function directly without the operator thus avoiding the issue of operator inside operator:

@task   
def multiple_email_send_test():
    from airflow.utils.email import send_email
    email_list = ['[email protected]', '[email protected]']
    for i in range(len(email_list)):
        send_email(
            to=str(email_list[i]),
            subject='Email Header',
            html_content= f"""
                    Hi {email_list[i]}, <br>
                    <p>This is the body of the email</p>
                    <br> Thank You. <br>
                """
        ) 

Upvotes: 5

Related Questions