oikonomiyaki
oikonomiyaki

Reputation: 7951

The rest of the task should not skip if BranchPythonOperator result is False

I have an Airflow DAG, that branches to whether to send an email or not. will_send_email_task is a BranchPythonOperator, that if len(message) > 0, it should go to the branch task send_email_notification_task. Otherwise, that task is skipped and will go straight to the DummyOperator join_task. The DAG works OK when the result of the branch is True (yes, it should send an email). However, the rest of the DAG is skipped when the result is False, which is not what the expected. The expected outcome if will_send_email_task is False should be that only send_email_notification_task is skipped/bypassed, but the rest of the flow continue as normal.

skipped DAG

Here is the Airflow DAG snippet:


# this function determines whether to send an email or not
def will_send_email(push_task, **context):
    message = context["task_instance"].xcom_pull(task_ids=push_task)

    if len(message) > 0:
        logging.info(f"email body: {message}")
        context["task_instance"].xcom_push(key="message", value=message)
        return 'send_email_notification_task'
    else:
        return 'join_task'
        
def some_python_callable(table_name, **context):
    ...

will_send_email_task = BranchPythonOperator(
    task_id='will_send_email_task',
    provide_context=True,
    python_callable=will_send_email,
    op_kwargs={'push_task': 'some_previous_task'},
    dag=dag
)


join_task = DummyOperator(
    task_id='join_task',
    dag=dag
)

send_email_notification_task = EmailOperator(
    task_id='send_email_notification_task',
    to=default_args['email'],
    subject="some email subject",
    html_content="{{ task_instance.xcom_pull(key='message') }}",
    dag=dag
)

end_task = DummyOperator(
    task_id='end_task',
    dag=dag
)

...

for table, val in some_dict.items():

    offload_task = PythonOperator(
        task_id = f"offload_{table}_task",
        dag=dag,
        provide_context=True,
        python_callable=some_python_callable,
        op_kwargs={'table_name': table}
    )
    
    offload_task.set_upstream(join_task)
    offload_task.set_downstream(end_task)

How should I configure my DAG so it would still run as expected?

Upvotes: 0

Views: 730

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16109

You will need to set trigger_rule='none_failed_min_one_success' for the join_task:

join_task = DummyOperator(
    task_id='join_task',
    dag=dag,
   trigger_rule='none_failed_min_one_success'
)

This is a use case which explained in trigger rules docs. The default trigger rule is all_success but in your case one of the upstream task of join_task is guaranteed to be skipped so you can not use the default trigger rule.

Note: For Airflow < 2.2 use trigger_rule='none_failed_or_skipped' trigger_rule='none_failed_or_skipped' The trigger rule was just renamed in later version as it's name was confusing (see PR), you can also use trigger_rule='all_done'.

Upvotes: 2

Related Questions