Ahalya Hegde
Ahalya Hegde

Reputation: 1641

chunk the large BigQuery response and save the chunks in CSV file using Apache Beam and Dataflow

I am new to Apache Beam and Dataflow. I am trying to fetch the large set of data ~20000 records. I have to chunk it for 1000 records and save the chunks in separate CSV files. I know how to read from BQ and write to CSV, but not able to understand how to chunk the files using beam transform or if there are any other ways.

What I tried: I started with simple code, where I am passing the data that I read from BQ and to ParDo function. Also I am not getting how I can use ParDo to chunk the records or if this is not the right approach please guide me though right direction.

Also ParDo is not printing the element I am passing in the below code.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class Printer(beam.DoFn):
   def process(self, element):
      print(element) 
      yield element
           
def run():
   with beam.Pipeline() as p:
        pcoll = (p
                  | "ReadFromBigQuery" >> beam.io.ReadFromBigQuery(
        query='SELECT email, name, age FROM `my_db`;', use_standard_sql=True)
                  | "par" >> beam.ParDo(Printer())
                  | "Print for now" >> beam.Map(print)
                  )

   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
   run()

Thank you for any help.

Upvotes: 1

Views: 1055

Answers (1)

robertwb
robertwb

Reputation: 5104

To write a CSV file you can use beam.io.WriteToText preceeded by a Map or DoFn that formats your elements into a comma-delimited line. If you data is schema'd, you could also use the dataframes API to write directly via the to_csv method.

The sharding of the output files is determined by the sharding of the workers, which may be dynamic. If you need exactly 1000 records in each chunk, the only way to do this would be via a DoFn that writes things out manually, e.g.

def write_to_file(contents, prefix):
  path = '%s-%d' % (prefix, hash(contents))
  with beam.io.filesystems.FileSystems.create(path + '.tmp') as fout:
    fout.write(contents)
  beam.io.filesystems.FileSystems.rename([path + '.tmp'], [path])

(input_pcoll
 | beam.Map(lambda row: ','.join(str(s) for s in row))  # or similar
 | beam.BatchElements(min_batch_size=1000, max_batch_size=1000)
 | beam.Map(lambda lines: '\n'.join(lines))
 | beam.Map(write_to_file, '/some/path/out'))

Upvotes: 1

Related Questions