Reputation: 121
I am trying to write BigQuery table records as JSON file in GCS bucket using Apache Beam in python.
I have a BigQuery table - my_project.my_dataset.my_table
like this
I wish to write the table records/entries into a JSON file in a GCS bucket location - "gs://my_core_bucket/data/my_data.json"
Format of JSON expected:
[
{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}},
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}},
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}},
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
]
But, with my current implementation of apache pipeline I see that the JSON file created has entries like this in file "gs://my_core_bucket/data/my_data.json"
{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}}
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}}
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}}
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
How do I create a clean JSON file having BigQuery records as JSON array elements ?
Here is my pipeline code.
import os
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class PrepareData(beam.DoFn):
def process(self, record): # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
rec_columns = [ "id", "name", "address", "phn", "country", "age"] # all columns of the bigquery table
rec_keys = list(record.keys()) # ["id", "name", "address", "phn"] # columns needed for processing
values = {}
for x in range(len(rec_keys)):
key = rec_keys[x]
if key != "id" and key in rec_columns:
values[key] = record[key]
return [{"id": record['id'], "values": values}]
class MainClass:
def run_pipe(self):
try:
project = "my_project"
dataset = "my_dataset"
table = "my_table"
region = "us-central1"
job_name = "create-json-file"
temp_location = "gs://my_core_bucket/dataflow/temp_location/"
runner = "DataflowRunner"
# set pipeline options
argv = [
f'--project={project}',
f'--region={region}',
f'--job_name={job_name}',
f'--temp_location={temp_location}',
f'--runner={runner}'
]
# json file name
file_name = "gs://my_core_bucket/data/my_data"
# create pipeline
p = beam.Pipeline(argv=argv)
# query to read table data
query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"
bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
# bq_data will be in the form
# {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
# {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
# {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
# {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
# alter data in the form needed for downstream process
prepared_data = bq_data | beam.ParDo(PrepareData())
# write formatted pcollection as JSON file
prepared_data | 'JSON format' >> beam.Map(json.dumps)
prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')
# execute pipeline
p.run().wait_until_finish()
except Exception as e:
logging.error(f"Exception in run_pipe - {str(e)}")
if __name__ == "__main__":
main_cls = MainClass()
main_cls.run_pipe()
Upvotes: 1
Views: 1863
Reputation: 53411
As suggested in the comments, please, try combining all the results in one. In order to successfully serialize the set
obtained as result of the combination process, you can use a custom serializer.
Your code can look like this:
import os
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Based on https://stackoverflow.com/questions/8230315/how-to-json-serialize-sets
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
# utility function for list combination
class ListCombineFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, input):
accumulator.append(input)
return accumulator
def merge_accumulators(self, accumulators):
merged = []
for accum in accumulators:
merged += accum
return merged
def extract_output(self, accumulator):
return accumulator
class PrepareData(beam.DoFn):
def process(self, record): # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
rec_columns = [ "id", "name", "address", "phn", "country", "age"] # all columns of the bigquery table
rec_keys = list(record.keys()) # ["id", "name", "address", "phn"] # columns needed for processing
values = {}
for x in range(len(rec_keys)):
key = rec_keys[x]
if key != "id" and key in rec_columns:
values[key] = record[key]
return [{"id": record['id'], "values": values}]
class MainClass:
def run_pipe(self):
try:
project = "my_project"
dataset = "my_dataset"
table = "my_table"
region = "us-central1"
job_name = "create-json-file"
temp_location = "gs://my_core_bucket/dataflow/temp_location/"
runner = "DataflowRunner"
# set pipeline options
argv = [
f'--project={project}',
f'--region={region}',
f'--job_name={job_name}',
f'--temp_location={temp_location}',
f'--runner={runner}'
]
# json file name
file_name = "gs://my_core_bucket/data/my_data"
# create pipeline
p = beam.Pipeline(argv=argv)
# query to read table data
query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"
bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
# bq_data will be in the form
# {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
# {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
# {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
# {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
# alter data in the form needed for downstream process
prepared_data = bq_data | beam.ParDo(PrepareData())
# combine all the results in one PCollection
# see https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/
prepared_data | 'Combine results' >> beam.CombineGlobally(ListCombineFn())
# write formatted pcollection as JSON file. We will use a
# custom encoder for se serialization
prepared_data | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder)
prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')
# execute pipeline
p.run().wait_until_finish()
except Exception as e:
logging.error(f"Exception in run_pipe - {str(e)}")
if __name__ == "__main__":
main_cls = MainClass()
main_cls.run_pipe()
Upvotes: 3
Reputation: 75820
You can do it directly in BigQuery and simply print the result as-is with Dataflow.
Only change the query
query = f"Select ARRAY_AGG(str) from (SELECT struct(id as id, name as name, address as address, phn as phn) as str FROM `{project}.{dataset}.{table}` LIMIT 4)"
Keep in mind that
Upvotes: 0