Reputation: 65
When creating the table I set
ingress_at DATETIME DEFAULT CURRENT_DATETIME
When looking at my schema in BigQuery I can see it worked:
I have a .json file in my bucket that has all the same columns as the table, expect for the ingress_at. I'm now trying to append the data in this file to the existing table, the data loads fine, but I keep ending up with NULL values all throughout the ingress_at table. Why is it not populated with the ingress date time as specified?
I've tried:
setting CURRENT_DATETIME() instead of CURRENT_DATETIME in the schema at table creation => no change
making ingress_at a REQUIRED field => load job fails because there's no such column in import file
setting auto detect TRUE when configuring the load job => errors because the file doesn't have headers and the ingress_at column doesn't exist
defining a schema for the load job => loads fine, but ingress_at column is entirely populated with NULL values
Upvotes: 0
Views: 638
Reputation: 4387
In Google BigQuery, default values specified in the table schema are not applied during load jobs. However, default values are used when inserting data via SQL INSERT statements. When you load data into BigQuery, if a column is missing in the source data, it will be populated with NULL values, regardless of the default value specified in the table schema.
Workaround 1:
Create a Staging Table. Load your JSON data into a staging table that has the same schema as your original table, but without the ingress_at column.
Insert Data with Default Value: Use an SQL INSERT statement to insert data from the staging table into the target table, setting ingress_at to CURRENT_DATETIME(). Here is an example code:
INSERT INTO your_target_table (column1, column2, ..., ingress_at) SELECT column1, column2, ..., CURRENT_DATETIME() FROM your_staging_table;
Workaroud 2: If you have a more complex ETL pipeline, consider using Google Cloud Dataflow to transform the data as it is being loaded. Dataflow allows you to process the data and add the CURRENT_DATETIME() to the ingress_at field before loading it into BigQuery like below:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
class AddIngressAt(beam.DoFn):
def process(self, element):
element['ingress_at'] = datetime.utcnow().isoformat()
yield element
def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadFromGCS' >> beam.io.ReadFromText('gs://your-bucket/your-file.json')
| 'ParseJson' >> beam.Map(json.loads)
| 'AddIngressAt' >> beam.ParDo(AddIngressAt())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'your-project:your_dataset.your_table',
schema='column1:STRING, column2:STRING, ..., ingress_at:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
if __name__ == '__main__':
run()
Upvotes: 0