Reputation: 4666
I created a BranchPythonOperator which calls 2 tasks depending on the condition like:
typicon_check_table = BranchPythonOperator(
task_id='typicon_check_table',
python_callable=CheckTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_create_table = PythonOperator(
task_id='typicon_create_table',
python_callable=CreateTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_load_data = PythonOperator(
task_id='typicon_load_data',
python_callable=LoadData(),
provide_context=True,
dag=typicon_task_dag)
typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)
This is the CheckTable
callable class:
class CheckTable:
"""
DAG task to check if table exists or not.
"""
def __call__(self, **kwargs) -> None:
pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
query = "SELECT EXISTS ( \
SELECT 1 FROM information_schema.tables \
WHERE table_schema = 'public' \
AND table_name = 'users');"
table_exists = pg_hook.get_records(query)[0][0]
if table_exists:
return "typicon_load_data"
return "typicon_create_table"
The issue is both the tasks are getting skipped when the typicon_check_table
task is run.
How to fix this issue?
Upvotes: 3
Views: 2767
Reputation: 31
I have worked out with same scenario , its working fine with me for below code
BranchPythonOperator(task_id='slot_population_on_is_y_or_n', python_callable=DAGConditionalValidation('Y'),
trigger_rule='one_success')
slot_population_on_is_y = DummyOperator(task_id='slot_population_on_is_y')
slot_population_on_is_n = DummyOperator(task_id='slot_population_on_is_n')
slot_population_on_is_y_or_n >> [slot_population_on_is_y, slot_population_on_is_n]
class DAGConditionalValidation:
def __init__(self, conditional_param_key):
self.conditional_param_key = conditional_param_key
def __call__(self, **kwargs):
if (conditional_param_key == 'Y'):
return slot_population_on_is_y
return slot_population_on_is_n
It looks all your code fine, but you're missing the trigger rule, please set trigger rule as trigger_rule='one_success'
.
This should work for you as well.
Upvotes: 3
Reputation: 1382
Add a trigger_rule="all_done" rule to the typicon_check_table as below
typicon_check_table = BranchPythonOperator(
task_id='typicon_check_table',
python_callable=CheckTable(),
provide_context=True,
trigger_rule="all_done",
dag=typicon_task_dag)
Upvotes: 0
Reputation: 2364
The task typicon_load_data
has typicon_create_table
as a parent and the default trigger_rule is all_success
, so I am not surprised by this behaviour.
Two possible cases here:
CheckTable()
returns typicon_load_data
, then typicon_create_table
is skipped, but typicon_load_data
being downstream is also skipped.CheckTable()
returns typicon_create_table
, that's executed and it triggers typicon_load_data
which is skipped because it was the excluded branch.I assume your screenshot is from case 1.?
Upvotes: 1