Frank Pinto
Frank Pinto

Reputation: 163

The BigQueryInsertJobOperator in Airflow does not create a table

I'm trying to setup an Airflow job that executes a BigQuery job by calling the BigQueryInsertJobOperator operator that should create a table to store the results of a query if it doesnt exist. The setup looks like this:

task3 = BigQueryInsertJobOperator(
             task_id="item_data",
             project_id="project_id",
             configuration={
    "jobType" : "QUERY",
    "query" : {
        "query" : "{% include 'sql_query.sql' %}",
        "useLegacySql" : False
    },
    "tableDefinitions" : {
        "fields" : [
            {
                "name" : "DEPT_NBR",
                "type" : "INTEGER"
            },
            {
                "name" : "ITEM_NBR",
                "type" : "INTEGER"
            },
            {
                "name" : "CREATED_DATE",
                "type" : "STRING"
            }
        ]
    },
    "destinationTable" : {
        "projectId" : "project_id",
        "datasetId" : "dataset_id",
        "tableId" : "table_id"
    },
    "createDisposition" : "CREATE_IF_NEEDED",
    "writeDisposition" : "WRITE_APPEND",
    "priority" : "BATCH",
    "schemaUpdateOptions" : [
        "ALLOW_FIELD_ADDITION"
    ],
    "timePartitioning" : {
        "type" : "DAY",
        "expirationMs" : 31556926000,
        "field" : "CREATED_DATE"
    },
    "clustering" : {
        "fields" : [
            "DEPT_NBR"
        ]
    }
},
             impersonation_chain="svc-account@project_id.iam.gserviceaccount.com",
             location="US" )

Everything executes perfectly but it does not create the table. When I check the logs, what I'm seeing is that it's storing the data in a temporary table with an expiration date of 24 hours and despite setting the priority to BATCH it's still running as INTERACTIVE. Any thoughts?

Upvotes: 1

Views: 1854

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

A level is missing in your configuration :

task3 = BigQueryInsertJobOperator(
        task_id="item_data",
        project_id="project_id",
        configuration={
            "query": {
                "query": "{% include 'sql_query.sql' %}",
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": "project_id",
                    "datasetId": "dataset_id",
                    "tableId": "table_id"
                },
                "createDisposition": "CREATE_IF_NEEDED",
                "writeDisposition": "WRITE_APPEND",
                "priority": "BATCH",
                "schemaUpdateOptions": [
                    "ALLOW_FIELD_ADDITION"
                ],
                "timePartitioning": {
                    "type": "DAY",
                    "expirationMs": 31556926000,
                    "field": "CREATED_DATE"
                },
                "clustering": {
                    "fields": [
                        "DEPT_NBR"
                    ]
                },
                "tableDefinitions": {
                    "fields": [
                        {
                            "name": "DEPT_NBR",
                            "type": "INTEGER"
                        },
                        {
                            "name": "ITEM_NBR",
                            "type": "INTEGER"
                        },
                        {
                            "name": "CREATED_DATE",
                            "type": "STRING"
                        }
                    ]
                }
            }
        },
        impersonation_chain="svc-account@project_id.iam.gserviceaccount.com",
        location="US")

There is a parent node query and the other options are put inside.

Upvotes: 1

Related Questions