Reputation: 1641
I am new to Apache Beam and Dataflow. I am trying to fetch the large set of data ~20000 records. I have to chunk it for 1000 records and save the chunks in separate CSV files. I know how to read from BQ and write to CSV, but not able to understand how to chunk the files using beam transform or if there are any other ways.
What I tried: I started with simple code, where I am passing the data that I read from BQ and to ParDo function. Also I am not getting how I can use ParDo to chunk the records or if this is not the right approach please guide me though right direction.
Also ParDo is not printing the element I am passing in the below code.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class Printer(beam.DoFn):
def process(self, element):
print(element)
yield element
def run():
with beam.Pipeline() as p:
pcoll = (p
| "ReadFromBigQuery" >> beam.io.ReadFromBigQuery(
query='SELECT email, name, age FROM `my_db`;', use_standard_sql=True)
| "par" >> beam.ParDo(Printer())
| "Print for now" >> beam.Map(print)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
Thank you for any help.
Upvotes: 1
Views: 1055
Reputation: 5104
To write a CSV file you can use beam.io.WriteToText
preceeded by a Map
or DoFn
that formats your elements into a comma-delimited line. If you data is schema'd, you could also use the dataframes API to write directly via the to_csv
method.
The sharding of the output files is determined by the sharding of the workers, which may be dynamic. If you need exactly 1000 records in each chunk, the only way to do this would be via a DoFn
that writes things out manually, e.g.
def write_to_file(contents, prefix):
path = '%s-%d' % (prefix, hash(contents))
with beam.io.filesystems.FileSystems.create(path + '.tmp') as fout:
fout.write(contents)
beam.io.filesystems.FileSystems.rename([path + '.tmp'], [path])
(input_pcoll
| beam.Map(lambda row: ','.join(str(s) for s in row)) # or similar
| beam.BatchElements(min_batch_size=1000, max_batch_size=1000)
| beam.Map(lambda lines: '\n'.join(lines))
| beam.Map(write_to_file, '/some/path/out'))
Upvotes: 1