Reputation: 89
I have 2 BigQueryOperator tasks in a loop. The first task works perfectly, however the second task (create_partition_table_agent_intensity_{v_list[i]}
) throws an error:
ERROR - 400 Syntax error: Unexpected "{" at [1:244]
I can't understand what is the difference between the tasks. Maybe someone can point me to the right direction?
Here is my entire code:
from airflow.models import (DAG, Variable)
import os
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime
import json
import pandas as pd
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from google.cloud import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
default_args = {
'start_date': datetime.datetime(2020, 1, 1),
}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
PROJECT_ID_GCP = os.environ.get("GCP_PROJECT_ID", "my_project")
DATASET_MRR = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_RAW")
DATASET_STG = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "LP_STG")
MRR_AGENT_ACTIVITY = "RPT_FA_AGENT_ACTIVITY_VW"
MRR_AGENT_INTENSITY = "RPT_AGG_15M_MSG_AGENT_INTENSITY_VW"
STG_AGENT_ACTIVITY_PARTITIONED = "agent_acitivity_partitioned"
STG_AGENT_INTENSITY_PARTITIONED = "agent_intensity_partitioned"
def list_dates_in_df(ti):
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
use_legacy_sql=False)
bq_client = bigquery.Client(project = hook._get_field("project"),
credentials = hook._get_credentials())
query = "select distinct(cast(PARTITION_KEY as string)) as PARTITION_KEY \
FROM LP_MNG.PartitionStatusMonitoring\
where SOURCE_TABLE in ('RPT_FA_AGENT_ACTIVITY_VW','RPT_AGG_15M_MSG_AGENT_INTENSITY_VW')\
and IS_LOAD_COMPLETED = false;"
df = bq_client.query(query).to_dataframe()
res = df.values.tolist()
#unpack the list of lists, l is a list inside res list, take item from res, now each item is l
my_list = [item for l in res for item in l]
ti.xcom_push(key = 'list_of_dates', value = my_list)
def update_variable(ti):
updated_file_list = ti.xcom_pull(key = 'list_of_dates',task_ids='list_dates')
Variable.set(key="updated_dates", value=json.dumps(updated_file_list))
print(updated_file_list)
print(type(updated_file_list))
with DAG(
'test_with_mng_table_list',
schedule_interval=None,
catchup = False,
default_args=default_args
) as dag:
list_dates = PythonOperator(
task_id ='list_dates',
python_callable = list_dates_in_df
)
set_list = PythonOperator(
task_id= 'set_list',
python_callable=update_variable
)
v_list = Variable.get("updated_dates", deserialize_json=True)
end_job = BashOperator(
task_id='end_job',
bash_command='echo end_job.',
trigger_rule = 'all_done', )
for i in range(len(v_list)):
create_partition_table_agent_activity = BigQueryOperator(
task_id=f"create_partition_table_agent_activity_{v_list[i]}",
sql="select ACCOUNT_ID,timestamp_trunc(CHANGE_EVENT_TIME_15M,HOUR) as ANALYSIS_DATE,\
AGENT_ID,AGENT_GROUP_ID,USER_TYPE_ID,\
sum(AWAY_ENGAGED_TIME) AWAY_ENGAGED_TIME,sum(BACKIN5_ENGAGED_TIME) BACKIN5_ENGAGED_TIME,\
sum(DURATION_DAYS) DURATION_DAYS,sum(ONLINE_TIME) ONLINE_TIME,\
sum(BACK_IN_5_TIME) BACK_IN_5_TIME,sum(AWAY_TIME) AWAY_TIME\
from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR1 }}\
where cast(CHANGE_EVENT_TIME_15M as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING) \
group by 1,2,3,4,5;",
params={"PROJECT_ID":PROJECT_ID_GCP ,
"DATASET_MRR":DATASET_MRR,
"MRR1":MRR_AGENT_ACTIVITY,
"date_a" : v_list[i]
},
destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_ACTIVITY_PARTITIONED}{v_list[i]}",
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
#bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
create_partition_table_agent_intensity = BigQueryOperator(
task_id=f"create_partition_table_agent_intensity_{v_list[i]}",
sql=f"select ACCOUNT_ID,timestamp_trunc(AGG_DATE,HOUR) as ANALYSIS_DATE,\
AGENT_ID, GROUP_ID as AGENT_GROUP_ID,\
USER_TYPE_ID, SUM(SUM_CONVERSATION_LOAD_RATE) as SUM_CONVERSATION_LOAD_RATE,\
SUM(NO_EVENTS) AS NO_EVENTS\
from {{ params.PROJECT_ID }}.{{ params.DATASET_MRR }}.{{ params.MRR2 }}\
where cast(AGG_DATE as STRING FORMAT 'YYYY-MM-DD') = cast('{{ params.date_a }}' as STRING) \
group by 1,2,3,4,5;",
params={"PROJECT_ID":PROJECT_ID_GCP ,
"DATASET_MRR":DATASET_MRR,
"MRR2":MRR_AGENT_INTENSITY,
"date_a" : v_list[i]
},
destination_dataset_table=f"{PROJECT_ID_GCP}.{DATASET_STG}.{STG_AGENT_INTENSITY_PARTITIONED}{v_list[i]}",
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
#bigquery_conn_id=CONNECTION_ID,
use_legacy_sql=False,
dag=dag
)
d2 = DummyOperator(task_id='generate_data_{0}'.format(v_list[i]),dag=dag)
list_dates >> set_list >> [
create_partition_table_agent_activity,create_partition_table_agent_intensity
] >> d2 >> end_job
Upvotes: 1
Views: 1745
Reputation: 4272
I do not have playground to test it, but I think you should not use f-string for sql parameter. If you use {{something}}
in f-string it returns string {something}
so parameters for query are not inserted and this results in SQL syntax error as query is run without parameters. Please try to remove f
before string for sql
in 2nd task.
Upvotes: 2