Reputation: 163
I'm trying to setup an Airflow job that executes a BigQuery job by calling the BigQueryInsertJobOperator operator that should create a table to store the results of a query if it doesnt exist. The setup looks like this:
task3 = BigQueryInsertJobOperator(
task_id="item_data",
project_id="project_id",
configuration={
"jobType" : "QUERY",
"query" : {
"query" : "{% include 'sql_query.sql' %}",
"useLegacySql" : False
},
"tableDefinitions" : {
"fields" : [
{
"name" : "DEPT_NBR",
"type" : "INTEGER"
},
{
"name" : "ITEM_NBR",
"type" : "INTEGER"
},
{
"name" : "CREATED_DATE",
"type" : "STRING"
}
]
},
"destinationTable" : {
"projectId" : "project_id",
"datasetId" : "dataset_id",
"tableId" : "table_id"
},
"createDisposition" : "CREATE_IF_NEEDED",
"writeDisposition" : "WRITE_APPEND",
"priority" : "BATCH",
"schemaUpdateOptions" : [
"ALLOW_FIELD_ADDITION"
],
"timePartitioning" : {
"type" : "DAY",
"expirationMs" : 31556926000,
"field" : "CREATED_DATE"
},
"clustering" : {
"fields" : [
"DEPT_NBR"
]
}
},
impersonation_chain="svc-account@project_id.iam.gserviceaccount.com",
location="US" )
Everything executes perfectly but it does not create the table. When I check the logs, what I'm seeing is that it's storing the data in a temporary table with an expiration date of 24 hours and despite setting the priority to BATCH
it's still running as INTERACTIVE
. Any thoughts?
Upvotes: 1
Views: 1854
Reputation: 6572
A level is missing in your configuration :
task3 = BigQueryInsertJobOperator(
task_id="item_data",
project_id="project_id",
configuration={
"query": {
"query": "{% include 'sql_query.sql' %}",
"useLegacySql": False,
"destinationTable": {
"projectId": "project_id",
"datasetId": "dataset_id",
"tableId": "table_id"
},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_APPEND",
"priority": "BATCH",
"schemaUpdateOptions": [
"ALLOW_FIELD_ADDITION"
],
"timePartitioning": {
"type": "DAY",
"expirationMs": 31556926000,
"field": "CREATED_DATE"
},
"clustering": {
"fields": [
"DEPT_NBR"
]
},
"tableDefinitions": {
"fields": [
{
"name": "DEPT_NBR",
"type": "INTEGER"
},
{
"name": "ITEM_NBR",
"type": "INTEGER"
},
{
"name": "CREATED_DATE",
"type": "STRING"
}
]
}
}
},
impersonation_chain="svc-account@project_id.iam.gserviceaccount.com",
location="US")
There is a parent node query
and the other options are put inside.
Upvotes: 1