hanz
hanz

Reputation: 49

Can I load compressed jsonl data from GCS to BigQuery and add an additional date column using DataFlow

Piggybacking off of this post, I want to create a beam dataflow job to load data from GCS to Bigquery. There are thousands of files within the GCS bucket, all of which are pretty massive and are compressed JSONL data. The data format makes it impossible to create a partitioned table using a date field, so I would like to add my own during the pipeline.

Is it possible to add a manual field to the pipeline, separate from the compressed data, so that when I load the data from GCS to BigQuery it appears in the end BigQuery table? I would like to be able to do this without having to unzip any of the files or performing a sequential SELECT <CRITERIA> operation on the table itself.

Upvotes: 0

Views: 40

Answers (1)

Steeve
Steeve

Reputation: 393

Yes you can easily do something like:

class CustomParsing(beam.DoFn):
    def to_runner_api_parameter(self, unused_context):
        return "beam:transforms:custom_parsing:custom_v0", None

    def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        parsed = json.loads(element.decode("utf-8"))
        parsed["datetimefield"] = timestamp.to_rfc3339()
        yield parsed

...

 with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | "ReadFromGCS" >> beam.io.ReadFromText('gs://bucket/*.json')
            | "CustomParse" >> beam.ParDo(CustomParsing())
            | "WriteToBigQuery" >> beam.io.WriteToBigQuery(BIGQUERY_TABLE,
                                                 schema=BIGQUERY_SCHEMA,
                                                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
        )

The datetimefield column will be added and data can be inserted to that field in BQ table

Upvotes: 0

Related Questions