Reputation: 75
I was trying to read from a big file in cloud storage and shard them according to a given field.
I'm planning to Read | Map(lambda x: (x[key field], x)) | GroupByKey | Write to file with the name of the key field.
However I couldn't find a way to write dynamically to cloud storage. Is this functionality supported?
Thank you, Yiqing
Upvotes: 2
Views: 1662
Reputation: 181
An experimental write was added to the Beam python SDK in 2.14.0, beam.io.fileio.WriteToFiles
:
my_pcollection | beam.io.fileio.WriteToFiles(
path='/my/file/path',
destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
file_naming=beam.io.fileio.destination_prefix_naming())
which can be used to write to different files per-record.
You can skip the GroupByKey
, just use destination
to decide which file each record is written to. The return value of destination
needs to be a value that can be grouped by.
More documentation here:
https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.html#dynamic-destinations
And the JIRA issue here:
https://issues.apache.org/jira/browse/BEAM-2857
Upvotes: 1