Reputation: 129
I am trying to read file from GCS bucket (with path: gs://bucket_name), and load it to Dataflow VM folder(with path /tmp/file name).
Also I need to copy another file from Dataflow VM folder back to GCS bucket.
I have tried apache_beam.io.gcp.gcsio library, but it not seems to work.
Can anyone give any suggestion on this?
Upvotes: 0
Views: 2331
Reputation: 918
Best way to do it is to trigger a custom DoFn
with the process
method calling into the GCS Python API. The DoFn can be triggered by sending in elements to the DoFn
. It can be triggered either by an Impulse (only execute once) or a PCollection (execute per element in the PCollection). Take a look here for downloading/uploading blobs and here for the GCS Python Client library docs.
import apache_beam as beam
from google.cloud import storage
p = beam.Pipeline(...)
impulse = p | beam.Impulse()
class ReadWriteToGcs(beam.DoFn):
def setup(self, e):
self.client = storage.Client()
def process(self, e):
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
blob.upload_from_filename(source_file_name)
Upvotes: 1