Reputation: 477
I run into in a situation like below: In fact, I try to update the schema of my partition table (partitioned by Time-unit column). I use this article and this example as my references and the doc says that
schemaUpdateOptions[] : Schema update options are supported in two cases: when writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination table is a partition of a table, specified by partition decorators. For normal tables, WRITE_TRUNCATE will always overwrite the schema.
So what I understand is, with LoadJobConfig().schema_update_options = [bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION]
:
LoadJobConfig().write_disposition
WRITE_APPEND | WRITE_TRUNCATE |
---|---|
Update schema successfully and new data is appended into the table | Update schema successfully BUT the table is overwrited with new data |
LoadJobConfig().write_disposition
WRITE_APPEND | WRITE_TRUNCATE |
---|---|
NOT ALLOWED - Error message: "invalid: Schema update options should only be specified with WRITE_APPEND disposition, or with WRITE_TRUNCATE disposition on a table partition." | Update schema successfully BUT the table is overwrited with new data |
It's always true when I'm using LoadJobConfig()
but if I use QueryJobConfig()
instead, things have changed.
In fact, it's still true for normals tables but for partitions tables, even when the write_disposition=WRITE_APPEND
, the schema is still updated successfully and the new data is appended into the table !!
How do we explain this situation, please ? There is something special about QueryJobConfig()
? Or Did I missunderstand something, please?
Many thanks !!
Upvotes: 4
Views: 16728
Reputation: 1602
There are some slight differences between each class, I would recommend that you pay attention to the default configurations of each one and you can solve your problem, if any of them that ends up returning erroes may be due to an incorrect inizialization of the configuration and its functionalities.
QueryJobConfig(**kwargs)
Configuration options for query jobs.
All properties in this class are optional. Values which are :data
:None
-> server defaults. Set properties on the constructed configuration by using the property name as the name of a keyword argument.
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the destination table.
# table_id = "your-project.your_dataset.your_table_name"
# Retrieves the destination table and checks the length of the schema.
table = client.get_table(table_id) # Make an API request.
print("Table {} contains {} columns".format(table_id, len(table.schema)))
# Configures the query to append the results to a destination table,
# allowing field addition.
job_config = bigquery.QueryJobConfig(
destination=table_id,
schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
# Start the query, passing in the extra configuration.
query_job = client.query(
# In this example, the existing table contains only the 'full_name' and
# 'age' columns, while the results of this query will contain an
# additional 'favorite_color' column.
'SELECT "Timmy" as full_name, 85 as age, "Blue" as favorite_color;',
job_config=job_config,
) # Make an API request.
query_job.result() # Wait for the job to complete.
# Checks the updated length of the schema.
table = client.get_table(table_id) # Make an API request.
print("Table {} now contains {} columns".format(table_id, len(table.schema)))
LoadJobConfig(**kwargs)
Configuration options for load jobs.
Set properties on the constructed configuration by using the property name as the name of a keyword argument. Values which are unset or :data
:None
use the BigQuery REST API
default values. See the BigQuery REST API
reference documentation for a list of default values.
Required options differ based on the source_format value. For example, the BigQuery API's
default value for source_format is "CSV"
. When loading a CSV
file, either schema must be set or autodetect must be set to :data
:True
.
# from google.cloud import bigquery
# client = bigquery.Client()
# project = client.project
# dataset_ref = bigquery.DatasetReference(project, 'my_dataset')
# filepath = 'path/to/your_file.csv'
# Retrieves the destination table and checks the length of the schema
table_id = "my_table"
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
print("Table {} contains {} columns.".format(table_id, len(table.schema)))
# Configures the load job to append the data to the destination table,
# allowing field addition
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
]
# In this example, the existing table contains only the 'full_name' column.
# 'REQUIRED' fields cannot be added to an existing schema, so the
# additional column must be 'NULLABLE'.
job_config.schema = [
bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open(filepath, "rb") as source_file:
job = client.load_table_from_file(
source_file,
table_ref,
location="US", # Must match the destination dataset location.
job_config=job_config,
) # API request
job.result() # Waits for table load to complete.
print(
"Loaded {} rows into {}:{}.".format(
job.output_rows, dataset_id, table_ref.table_id
)
)
# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_id, len(table.schema)))
It should be noted that google.cloud.bigquery.job.SchemaUpdateOption
is overloaded in both classes and specifies updates to the destination table schema to allow as a side effect of the query job.
Upvotes: 5