Reputation: 79
I'm using the BigQueryCreateEmptyTableOperator
within Airflow to create tables in my BigQuery project. It's been working great, for everything except partitioned tables.
When I try to pass the required dictionary (specified here) to the time_partitioning
parameter, I get the following completely nonsensical error message:
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/MY_PROJECT/datasets/MY_DATASET/tables?prettyPrint=false: Invalid JSON payload received. Unknown name "timePartitioning" at 'table': Proto field is not repeating, cannot start list
I literally have no idea where to even start with debugging this. Here is the exact operator code. It works just fine if time_partitioning
is not passed.
create = BigQueryCreateEmptyTableOperator(
task_id="create",
bigquery_conn_id='google_cloud',
project_id="MY_PROJECT",
dataset_id="MY_DATASET",
table_id=table_name,
schema_fields=self.get_schema(),
time_partitioning={'field': 'ds', 'type' : 'DAY'},
)
Upvotes: 0
Views: 1314
Reputation: 1818
I used the below sample code to run dag’s which uses BigQueryCreateEmptyDatasetOperator
and BigQueryCreateEmptyTableOperator
to create dataset and partitioned table with the time_partitioning parameter in BigQuery.
import os
import time
import datetime
from datetime import datetime, date, time, timedelta
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "testdataset")
TABLE_NAME = "partitioned_table"
SCHEMA = [
{"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "ds", "type": "DATE", "mode": "NULLABLE"},
]
dag_id = "example_bigquery"
with models.DAG(
dag_id,
schedule_interval="@hourly", # Override to match your needs
start_date=days_ago(1),
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
default_args={"project_id": PROJECT_ID},
) as dag_with_locations:
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create-dataset", dataset_id=DATASET_NAME, project_id=PROJECT_ID
)
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
schema_fields=SCHEMA,
time_partitioning={
"type": "DAY",
"field": "ds",
},
)
create_dataset >> create_table
Output:
Upvotes: 2