Reputation:
Using apache_beam.io.filesystems.FileSystems
how to write to GCS with a ParDo and a DoFn ?? I am already getting output in csv format from a pardo, do i need to write another pardo to write it to gcs or can i directly import a module to write it directly to gcs ? please help
Upvotes: 0
Views: 3063
Reputation: 7058
I have an example here where I write b64-encoded images to GCS using apache_beam.io.filesystems.FileSystems
. The last step of the pipeline takes b64
as the PCollection containing two fields key_id
and image
and applies the ParDo:
b64 | 'Save images' >> beam.ParDo(WriteToSeparateFiles(known_args.output))
where known_args.output
is a GCS base path and WriteToSeparateFiles
is as follows:
class WriteToSeparateFiles(beam.DoFn):
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
writer = filesystems.FileSystems.create(self.outdir + element['key_id'] + '.png')
writer.write(element['image'])
writer.close()
With filesystems.FileSystems.create()
I have control over the destination path. For the base path I use the parameter we passed to the function and I use the key_id
of each element to generate meaningful file names. Finally, I append the .png
extension as I am writing images.
I use writer.write(element['image'])
to save the contents of the image
field for each file and close the stream with writer.close()
.
Upvotes: 3