Tomasz Kubat
Tomasz Kubat

Reputation: 168

Airflow - xcom_pull in the bigquery operator

I try to use xcomm_pull to insert a data_key_param calculated by the python_operator and pass it to the bigquery_operator. The python operator return the output as string e.g. "2020-05-31".

I got an error when running the BigqueryOperator: "Dependencies Blocking Task From Getting Scheduled" - Could not cast literal "{xcom_pull(task_ids[\'set_date_key_param\'])[0] }"

The sql attribute value returned from the Airflow GUI after task execution:
SELECT DATE_KEY, count(*) as COUNT
FROM my-project.my_datasets.source_table
WHERE DATE_KEY = {{ task_instance.xcom_pull(task_ids='set_date_key_param') }}
GROUP BY DATE_KEY

Code below (I have already treid to use '{{' and '}}' to enclose the task_instance.xcom...):

def set_date_key_param():

    # a business logic here
    return "2020-05-31" # example results

# task 1

set_date_key_param = PythonOperator(
    task_id='set_date_key_param',
    provide_context=True,
    python_callable=set_date_key_param,
    dag=dag
)

# taks 2

load_data_to_bq_table = BigQueryOperator(
    task_id='load_data_to_bq_table',
    sql="""SELECT DATE_KEY, count(*) as COUNT
    FROM `{project}.{dataset}.source_table` 
    WHERE DATE_KEY = {{{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}}}
    GROUP BY DATE_KEY""".format(
        project=PROJECT_ID,
        env=ENV
),
use_legacy_sql=False,
destination_dataset_table="{project}.{dataset}.target_table".format(
    project=PROJECT_ID,
    dataset=BQ_TARGET_DATASET,
),
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_NEVER",
trigger_rule='all_success',
dag=dag

)

set_date_key_param >> load_data_to_bq_table

Upvotes: 1

Views: 4804

Answers (2)

Emma
Emma

Reputation: 9308

I think the string formatting and jinja template is conflicting each other.

In your use case where leveraging xcom, I think it makes sense to use jinja template.

load_data_to_bq_table = BigQueryOperator(
    task_id='load_data_to_bq_table',
    sql="""SELECT DATE_KEY, count(*) as COUNT
        FROM `{{ params.project }}.{{ params.dataset }}.source_table` 
        WHERE DATE_KEY = \"{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}\"
        GROUP BY DATE_KEY""",
    params={
        'project': PROJECT_ID,
        'env': ENV   # env or dataset??, match this name to the params key in sql
    }
)

Upvotes: 3

SergiyKolesnikov
SergiyKolesnikov

Reputation: 7815

You named the Python callable and the variable to hold the first Python operator the same: set_date_key_param. Rename the Python callable (e.g. set_date) and change the parameters for the Python operator accordingly.

Upvotes: 0

Related Questions