Reputation: 2876
I'm trying to get BranchPythonOperator working but I have the following error:
'BigQueryInsertJobOperator' object is not iterable
Here is my Branch Operator:
branching = BranchPythonOperator(
task_id='branching',
python_callable=return_branch,
provide_context=True)
Here is my Python Callable:
def return_branch(ds, **kwargs):
execution_year = kwargs['execution_date'].strftime("%Y")
type = dataset_metadata[f'{execution_year}']['var']
if type == 'foo':
return x
return y
x and y are BigQueryInsertJobOperator:
x = BigQueryInsertJobOperator(
task_id='x',
configuration={
"query": {
"query": "{% include 'q.sql' %}",
"use_legacy_sql": False
}
},
dag=dag)
Upvotes: 1
Views: 5045
Reputation: 589
I'd like to refer to this answer. Your method, return_branch
, shouldn't return the operator. It must return the task_id
of your operator. You'll get something like this:
def return_branch(ds, **kwargs):
next_task_id = "a" # <some kind of logic>
return next_task_id
branching = BranchPythonOperator(
task_id="pick_query",
python_callable=return_branch,
provide_context=True,
)
option_1 = DummyOperator(task_id="a")
option_2 = DummyOperator(task_id="b")
branching >> [option_1, option_2]
Upvotes: 2