yiqing_hua
yiqing_hua

Reputation: 75

Write to dynamic destination to cloud storage in dataflow in Python

I was trying to read from a big file in cloud storage and shard them according to a given field.

I'm planning to Read | Map(lambda x: (x[key field], x)) | GroupByKey | Write to file with the name of the key field.

However I couldn't find a way to write dynamically to cloud storage. Is this functionality supported?

Thank you, Yiqing

Upvotes: 2

Views: 1662

Answers (2)

anrope
anrope

Reputation: 181

An experimental write was added to the Beam python SDK in 2.14.0, beam.io.fileio.WriteToFiles:

my_pcollection | beam.io.fileio.WriteToFiles(
      path='/my/file/path',
      destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
      sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
      file_naming=beam.io.fileio.destination_prefix_naming())

which can be used to write to different files per-record.

You can skip the GroupByKey, just use destination to decide which file each record is written to. The return value of destination needs to be a value that can be grouped by.

More documentation here:

https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.html#dynamic-destinations

And the JIRA issue here:

https://issues.apache.org/jira/browse/BEAM-2857

Upvotes: 1

jkff
jkff

Reputation: 17913

Yes, you can use the FileSystems API to create the files.

Upvotes: 1

Related Questions