Reputation: 49
Piggybacking off of this post, I want to create a beam dataflow job to load data from GCS to Bigquery. There are thousands of files within the GCS bucket, all of which are pretty massive and are compressed JSONL data. The data format makes it impossible to create a partitioned table using a date field, so I would like to add my own during the pipeline.
Is it possible to add a manual field to the pipeline, separate from the compressed data, so that when I load the data from GCS to BigQuery it appears in the end BigQuery table? I would like to be able to do this without having to unzip any of the files or performing a sequential SELECT <CRITERIA>
operation on the table itself.
Upvotes: 0
Views: 40
Reputation: 393
Yes you can easily do something like:
class CustomParsing(beam.DoFn):
def to_runner_api_parameter(self, unused_context):
return "beam:transforms:custom_parsing:custom_v0", None
def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
parsed = json.loads(element.decode("utf-8"))
parsed["datetimefield"] = timestamp.to_rfc3339()
yield parsed
...
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromGCS" >> beam.io.ReadFromText('gs://bucket/*.json')
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(BIGQUERY_TABLE,
schema=BIGQUERY_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
The datetimefield
column will be added and data can be inserted to that field in BQ table
Upvotes: 0