\n
Here is the Airflow DAG snippet:
\n\n# this function determines whether to send an email or not\ndef will_send_email(push_task, **context):\n message = context["task_instance"].xcom_pull(task_ids=push_task)\n\n if len(message) > 0:\n logging.info(f"email body: {message}")\n context["task_instance"].xcom_push(key="message", value=message)\n return 'send_email_notification_task'\n else:\n return 'join_task'\n \ndef some_python_callable(table_name, **context):\n ...\n\nwill_send_email_task = BranchPythonOperator(\n task_id='will_send_email_task',\n provide_context=True,\n python_callable=will_send_email,\n op_kwargs={'push_task': 'some_previous_task'},\n dag=dag\n)\n\n\njoin_task = DummyOperator(\n task_id='join_task',\n dag=dag\n)\n\nsend_email_notification_task = EmailOperator(\n task_id='send_email_notification_task',\n to=default_args['email'],\n subject="some email subject",\n html_content="{{ task_instance.xcom_pull(key='message') }}",\n dag=dag\n)\n\nend_task = DummyOperator(\n task_id='end_task',\n dag=dag\n)\n\n...\n\nfor table, val in some_dict.items():\n\n offload_task = PythonOperator(\n task_id = f"offload_{table}_task",\n dag=dag,\n provide_context=True,\n python_callable=some_python_callable,\n op_kwargs={'table_name': table}\n )\n \n offload_task.set_upstream(join_task)\n offload_task.set_downstream(end_task)\n
\nHow should I configure my DAG so it would still run as expected?
\n","author":{"@type":"Person","name":"oikonomiyaki"},"upvoteCount":0,"answerCount":1,"acceptedAnswer":{"@type":"Answer","text":"You will need to set trigger_rule='none_failed_min_one_success'
for the join_task
:
join_task = DummyOperator(\n task_id='join_task',\n dag=dag,\n trigger_rule='none_failed_min_one_success'\n)\n
\nThis is a use case which explained in trigger rules docs.\nThe default trigger rule is all_success
but in your case one of the upstream task of join_task
is guaranteed to be skipped so you can not use the default trigger rule.
Note: For Airflow < 2.2 use trigger_rule='none_failed_or_skipped'
trigger_rule='none_failed_or_skipped'
The trigger rule was just renamed in later version as it's name was confusing (see PR), you can also use trigger_rule='all_done'
.
Reputation: 7951
I have an Airflow DAG, that branches to whether to send an email or not. will_send_email_task
is a BranchPythonOperator, that if len(message) > 0
, it should go to the branch task send_email_notification_task
. Otherwise, that task is skipped and will go straight to the DummyOperator join_task
. The DAG works OK when the result of the branch is True
(yes, it should send an email). However, the rest of the DAG is skipped when the result is False
, which is not what the expected. The expected outcome if will_send_email_task
is False
should be that only send_email_notification_task
is skipped/bypassed, but the rest of the flow continue as normal.
Here is the Airflow DAG snippet:
# this function determines whether to send an email or not
def will_send_email(push_task, **context):
message = context["task_instance"].xcom_pull(task_ids=push_task)
if len(message) > 0:
logging.info(f"email body: {message}")
context["task_instance"].xcom_push(key="message", value=message)
return 'send_email_notification_task'
else:
return 'join_task'
def some_python_callable(table_name, **context):
...
will_send_email_task = BranchPythonOperator(
task_id='will_send_email_task',
provide_context=True,
python_callable=will_send_email,
op_kwargs={'push_task': 'some_previous_task'},
dag=dag
)
join_task = DummyOperator(
task_id='join_task',
dag=dag
)
send_email_notification_task = EmailOperator(
task_id='send_email_notification_task',
to=default_args['email'],
subject="some email subject",
html_content="{{ task_instance.xcom_pull(key='message') }}",
dag=dag
)
end_task = DummyOperator(
task_id='end_task',
dag=dag
)
...
for table, val in some_dict.items():
offload_task = PythonOperator(
task_id = f"offload_{table}_task",
dag=dag,
provide_context=True,
python_callable=some_python_callable,
op_kwargs={'table_name': table}
)
offload_task.set_upstream(join_task)
offload_task.set_downstream(end_task)
How should I configure my DAG so it would still run as expected?
Upvotes: 0
Views: 730
Reputation: 16109
You will need to set trigger_rule='none_failed_min_one_success'
for the join_task
:
join_task = DummyOperator(
task_id='join_task',
dag=dag,
trigger_rule='none_failed_min_one_success'
)
This is a use case which explained in trigger rules docs.
The default trigger rule is all_success
but in your case one of the upstream task of join_task
is guaranteed to be skipped so you can not use the default trigger rule.
Note: For Airflow < 2.2 use trigger_rule='none_failed_or_skipped'
trigger_rule='none_failed_or_skipped'
The trigger rule was just renamed in later version as it's name was confusing (see PR), you can also use trigger_rule='all_done'
.
Upvotes: 2