Sandeep Mohanty
Sandeep Mohanty

Reputation: 1552

How to get the xcom pull as dictionary

I have a dag where i have pushed a dictionary to xcom and wnat to pull it in BigQuery Operator, i have also defined render_template_as_native_obj=True, but still it is giving error as

time partitioning argument must have a class type dict not class str error

code :

def load_job_func(**kwargs):
    table_name = kwargs['table_name']
    load_date = kwargs['load_date']
    Snapshot_Month_Date = kwargs['Snapshot_Month_Date']
    query = "SELECT 1"
    ti = kwargs['ti']
    client = bigquery.Client()
    partition_date = None
    partition_field = None

    if table_name == "printer_data":
        partition_date = load_date.replace("-", "")
        query = f"""SELECT
                    Instance,
                    cast(load_date as Date) as load_date
                    from {BQ_PROJECT}.{BQ_landing_dataset}.{table_name}"""
        partition_field = 'load_date'
    elif table_name == "monthly_print_report":
        partition_date = Snapshot_Month_Date.replace("-", "")
        query = f"""SELECT
                    Location,
                    Serial_Number,
                    cast(load_date as Date) as load_date,
                    cast(Snapshot_Month_Date as date) as Snapshot_Month_Date
                    from {BQ_PROJECT}.{BQ_landing_dataset}.{table_name}"""
        partition_field = 'Snapshot_Month_Date'

    ti.xcom_push(key='partition_date', value=partition_date)
    ti.xcom_push(key='partition_field', value={'type': 'DAY', 'field': partition_field})
    return query

load_job_config = PythonOperator(
    task_id=task_id + "_load_job_config",
    python_callable=load_job_func,
    op_kwargs={"table_name": table_name, "load_date": load_date, 'Snapshot_Month_Date': Snapshot_Month_Date},
    provide_context=True,
    dag=dag
)

stg_load_task = BigQueryOperator(
    task_id=task_id + "_STG_Load",
    destination_dataset_table=f"{BQ_PROJECT}.{BQ_stg_dataset}.{table_name}${{task_instance.xcom_pull(key='partition_date', task_ids='{task_id}_load_job_config')}}",
    write_disposition="WRITE_TRUNCATE",
    sql="{{ task_instance.xcom_pull(task_ids='{task_id}_load_job_config') }}".format(task_id=task_id),
    time_partitioning="{{ task_instance.xcom_pull(key='partition_field', task_ids='{0}_load_job_config') }}".format(task_id),
    use_legacy_sql=False,
    allow_large_results=True,
    dag=dag
)

Upvotes: 1

Views: 128

Answers (0)

Related Questions