Reputation: 1552
I have a dag where i have pushed a dictionary to xcom and wnat to pull it in BigQuery Operator, i have also defined render_template_as_native_obj=True, but still it is giving error as
time partitioning argument must have a class type dict not class str error
code :
def load_job_func(**kwargs):
table_name = kwargs['table_name']
load_date = kwargs['load_date']
Snapshot_Month_Date = kwargs['Snapshot_Month_Date']
query = "SELECT 1"
ti = kwargs['ti']
client = bigquery.Client()
partition_date = None
partition_field = None
if table_name == "printer_data":
partition_date = load_date.replace("-", "")
query = f"""SELECT
Instance,
cast(load_date as Date) as load_date
from {BQ_PROJECT}.{BQ_landing_dataset}.{table_name}"""
partition_field = 'load_date'
elif table_name == "monthly_print_report":
partition_date = Snapshot_Month_Date.replace("-", "")
query = f"""SELECT
Location,
Serial_Number,
cast(load_date as Date) as load_date,
cast(Snapshot_Month_Date as date) as Snapshot_Month_Date
from {BQ_PROJECT}.{BQ_landing_dataset}.{table_name}"""
partition_field = 'Snapshot_Month_Date'
ti.xcom_push(key='partition_date', value=partition_date)
ti.xcom_push(key='partition_field', value={'type': 'DAY', 'field': partition_field})
return query
load_job_config = PythonOperator(
task_id=task_id + "_load_job_config",
python_callable=load_job_func,
op_kwargs={"table_name": table_name, "load_date": load_date, 'Snapshot_Month_Date': Snapshot_Month_Date},
provide_context=True,
dag=dag
)
stg_load_task = BigQueryOperator(
task_id=task_id + "_STG_Load",
destination_dataset_table=f"{BQ_PROJECT}.{BQ_stg_dataset}.{table_name}${{task_instance.xcom_pull(key='partition_date', task_ids='{task_id}_load_job_config')}}",
write_disposition="WRITE_TRUNCATE",
sql="{{ task_instance.xcom_pull(task_ids='{task_id}_load_job_config') }}".format(task_id=task_id),
time_partitioning="{{ task_instance.xcom_pull(key='partition_field', task_ids='{0}_load_job_config') }}".format(task_id),
use_legacy_sql=False,
allow_large_results=True,
dag=dag
)
Upvotes: 1
Views: 128