evam
evam

Reputation: 65

BigQuery Load Job doesn't respect default value set in schema

When creating the table I set

ingress_at DATETIME DEFAULT CURRENT_DATETIME

When looking at my schema in BigQuery I can see it worked:

enter image description here

I have a .json file in my bucket that has all the same columns as the table, expect for the ingress_at. I'm now trying to append the data in this file to the existing table, the data loads fine, but I keep ending up with NULL values all throughout the ingress_at table. Why is it not populated with the ingress date time as specified?

I've tried:

Upvotes: 0

Views: 638

Answers (1)

Elmar
Elmar

Reputation: 4387

In Google BigQuery, default values specified in the table schema are not applied during load jobs. However, default values are used when inserting data via SQL INSERT statements. When you load data into BigQuery, if a column is missing in the source data, it will be populated with NULL values, regardless of the default value specified in the table schema.

Workaround 1:

  1. Create a Staging Table. Load your JSON data into a staging table that has the same schema as your original table, but without the ingress_at column.

  2. Insert Data with Default Value: Use an SQL INSERT statement to insert data from the staging table into the target table, setting ingress_at to CURRENT_DATETIME(). Here is an example code:

INSERT INTO your_target_table (column1, column2, ..., ingress_at) SELECT column1, column2, ..., CURRENT_DATETIME() FROM your_staging_table;

Workaroud 2: If you have a more complex ETL pipeline, consider using Google Cloud Dataflow to transform the data as it is being loaded. Dataflow allows you to process the data and add the CURRENT_DATETIME() to the ingress_at field before loading it into BigQuery like below:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime

class AddIngressAt(beam.DoFn):
    def process(self, element):
        element['ingress_at'] = datetime.utcnow().isoformat()
        yield element

def run():
    pipeline_options = PipelineOptions()
    with beam.Pipeline(options=pipeline_options) as p:
        (p
         | 'ReadFromGCS' >> beam.io.ReadFromText('gs://your-bucket/your-file.json')
         | 'ParseJson' >> beam.Map(json.loads)
         | 'AddIngressAt' >> beam.ParDo(AddIngressAt())
         | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
             'your-project:your_dataset.your_table',
             schema='column1:STRING, column2:STRING, ..., ingress_at:TIMESTAMP',
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
         )
        )

if __name__ == '__main__':
    run()

Upvotes: 0

Related Questions