Reputation: 168
I try to use xcomm_pull
to insert a data_key_param calculated by the python_operator
and pass it to the bigquery_operator
. The python operator return the output as string e.g. "2020-05-31".
I got an error when running the BigqueryOperator: "Dependencies Blocking Task From Getting Scheduled" - Could not cast literal "{xcom_pull(task_ids[\'set_date_key_param\'])[0] }"
The sql attribute value returned from the Airflow GUI after task execution:
SELECT DATE_KEY, count(*) as COUNT
FROM my-project.my_datasets.source_table
WHERE DATE_KEY = {{ task_instance.xcom_pull(task_ids='set_date_key_param') }}
GROUP BY DATE_KEY
Code below (I have already treid to use '{{' and '}}' to enclose the task_instance.xcom...
):
def set_date_key_param():
# a business logic here
return "2020-05-31" # example results
# task 1
set_date_key_param = PythonOperator(
task_id='set_date_key_param',
provide_context=True,
python_callable=set_date_key_param,
dag=dag
)
# taks 2
load_data_to_bq_table = BigQueryOperator(
task_id='load_data_to_bq_table',
sql="""SELECT DATE_KEY, count(*) as COUNT
FROM `{project}.{dataset}.source_table`
WHERE DATE_KEY = {{{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}}}
GROUP BY DATE_KEY""".format(
project=PROJECT_ID,
env=ENV
),
use_legacy_sql=False,
destination_dataset_table="{project}.{dataset}.target_table".format(
project=PROJECT_ID,
dataset=BQ_TARGET_DATASET,
),
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_NEVER",
trigger_rule='all_success',
dag=dag
)
set_date_key_param >> load_data_to_bq_table
Upvotes: 1
Views: 4804
Reputation: 9308
I think the string formatting and jinja template is conflicting each other.
In your use case where leveraging xcom, I think it makes sense to use jinja template.
load_data_to_bq_table = BigQueryOperator(
task_id='load_data_to_bq_table',
sql="""SELECT DATE_KEY, count(*) as COUNT
FROM `{{ params.project }}.{{ params.dataset }}.source_table`
WHERE DATE_KEY = \"{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}\"
GROUP BY DATE_KEY""",
params={
'project': PROJECT_ID,
'env': ENV # env or dataset??, match this name to the params key in sql
}
)
Upvotes: 3
Reputation: 7815
You named the Python callable and the variable to hold the first Python operator the same: set_date_key_param
. Rename the Python callable (e.g. set_date
) and change the parameters for the Python operator accordingly.
Upvotes: 0