Gopinath S
Gopinath S

Reputation: 121

How to write BigQuery results to GCS in JSON format using Apache Beam with custom formatting?

I am trying to write BigQuery table records as JSON file in GCS bucket using Apache Beam in python.

I have a BigQuery table - my_project.my_dataset.my_table like this

enter image description here

I wish to write the table records/entries into a JSON file in a GCS bucket location - "gs://my_core_bucket/data/my_data.json"

Format of JSON expected:


[
    {"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}},
    {"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}},
    {"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}},
    {"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
]

But, with my current implementation of apache pipeline I see that the JSON file created has entries like this in file "gs://my_core_bucket/data/my_data.json"

{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}}
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}}
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}}
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}

How do I create a clean JSON file having BigQuery records as JSON array elements ?

Here is my pipeline code.

import os
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class PrepareData(beam.DoFn):
    def process(self, record):  # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}        
        rec_columns = [ "id", "name", "address", "phn", "country", "age"]   # all columns of the bigquery table 

        rec_keys = list(record.keys())  # ["id", "name", "address", "phn"]  # columns needed for processing  

        values = {}

        for x in range(len(rec_keys)):
            key = rec_keys[x]

            if key != "id" and key in rec_columns:
                values[key] = record[key]

        return [{"id": record['id'], "values": values}]


class MainClass:
    def run_pipe(self):
        try:        
            project = "my_project"
            dataset = "my_dataset"
            table = "my_table"
            region = "us-central1"
            job_name = "create-json-file"
            temp_location = "gs://my_core_bucket/dataflow/temp_location/"
            runner = "DataflowRunner"
            
            # set pipeline options
            argv = [
                f'--project={project}',
                f'--region={region}',
                f'--job_name={job_name}',
                f'--temp_location={temp_location}',
                f'--runner={runner}'
            ]
            
            # json file name
            file_name = "gs://my_core_bucket/data/my_data"

            # create pipeline 
            p = beam.Pipeline(argv=argv)

            # query to read table data
            query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"

            bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))

            # bq_data will be in the form 
            # {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
            # {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
            # {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
            # {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
            
            # alter data in the form needed for downstream process
            prepared_data = bq_data | beam.ParDo(PrepareData())

            # write formatted pcollection as JSON file
            prepared_data | 'JSON format' >> beam.Map(json.dumps)
            prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')

            # execute pipeline
            p.run().wait_until_finish()
        except Exception as e:
            logging.error(f"Exception in run_pipe - {str(e)}")


if __name__ == "__main__":
    main_cls = MainClass()
    main_cls.run_pipe()

Upvotes: 1

Views: 1863

Answers (2)

jccampanero
jccampanero

Reputation: 53411

As suggested in the comments, please, try combining all the results in one. In order to successfully serialize the set obtained as result of the combination process, you can use a custom serializer.

Your code can look like this:

import os
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


# Based on https://stackoverflow.com/questions/8230315/how-to-json-serialize-sets
class SetEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        return json.JSONEncoder.default(self, obj)


# utility function for list combination
class ListCombineFn(beam.CombineFn):
    def create_accumulator(self):
        return []

    def add_input(self, accumulator, input):
        accumulator.append(input)
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = []
        for accum in accumulators:
            merged += accum
        return merged

    def extract_output(self, accumulator):
        return accumulator



class PrepareData(beam.DoFn):
    def process(self, record):  # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}        
        rec_columns = [ "id", "name", "address", "phn", "country", "age"]   # all columns of the bigquery table 

        rec_keys = list(record.keys())  # ["id", "name", "address", "phn"]  # columns needed for processing  

        values = {}

        for x in range(len(rec_keys)):
            key = rec_keys[x]

            if key != "id" and key in rec_columns:
                values[key] = record[key]

        return [{"id": record['id'], "values": values}]


class MainClass:
    def run_pipe(self):
        try:        
            project = "my_project"
            dataset = "my_dataset"
            table = "my_table"
            region = "us-central1"
            job_name = "create-json-file"
            temp_location = "gs://my_core_bucket/dataflow/temp_location/"
            runner = "DataflowRunner"
            
            # set pipeline options
            argv = [
                f'--project={project}',
                f'--region={region}',
                f'--job_name={job_name}',
                f'--temp_location={temp_location}',
                f'--runner={runner}'
            ]
            
            # json file name
            file_name = "gs://my_core_bucket/data/my_data"

            # create pipeline 
            p = beam.Pipeline(argv=argv)

            # query to read table data
            query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"

            bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))

            # bq_data will be in the form 
            # {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
            # {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
            # {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
            # {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
            
            # alter data in the form needed for downstream process
            prepared_data = bq_data | beam.ParDo(PrepareData())

            # combine all the results in one PCollection
            # see https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/
            prepared_data | 'Combine results' >> beam.CombineGlobally(ListCombineFn())

            # write formatted pcollection as JSON file. We will use a 
            # custom encoder for se serialization
            prepared_data | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder)
            prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')

            # execute pipeline
            p.run().wait_until_finish()
        except Exception as e:
            logging.error(f"Exception in run_pipe - {str(e)}")


if __name__ == "__main__":
    main_cls = MainClass()
    main_cls.run_pipe()

Upvotes: 3

guillaume blaquiere
guillaume blaquiere

Reputation: 75820

You can do it directly in BigQuery and simply print the result as-is with Dataflow.

Only change the query

query = f"Select ARRAY_AGG(str) from (SELECT struct(id as id, name as name, address as address, phn as phn) as str FROM `{project}.{dataset}.{table}` LIMIT 4)"

Keep in mind that

  • The BigQuery processing will be always faster and cheaper than a dataflow processing (or other processing on equivalent chip)
  • Dataflow will always build a valid JSON (your JSON is invalid, you can't start by an array)

Upvotes: 0

Related Questions