Reputation: 2342
I currently have a DoFn
that looks at a bucket and looks at all the files within that bucket and dir prefix. This DoFn
returns a list instead of a PCollection
. How would I convert this list into a PCollection
that can be consumed by the DoFn
ConvertFileNames
?
# List all the files within a subdir
class ListBlobs(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, prefix):
bucket = self.storage_client.bucket('xxx')
return list(self.bucket.list_blobs(prefix=prefix))
# Convert Blobs into filenames as patterns
class ConvertFileNames(beam.DoFn):
def process(self, blob):
return 'gs://' + blob.bucket.name + blob.name
Upvotes: 0
Views: 1080
Reputation: 5104
As mentioned in the beam documentation, a Beam DoFn's process method returns an iterable of elements to place into the downstream PCollection. So, in your example, if I had a PCollection of prefixes, call it prefix_pcoll
, then I could write
blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())
and blobs_pcoll
will contain the list of blobs with this prefix (namely, the concatination of list(self.bucket.list_blobs(prefix=prefix))
over all prefixes). You could then write
converted = blobs_pcoll | beam.ParDo(ConvertFileNames())
you could also write
converted = blobs_pcoll | beam.Map(
lambda blob: 'gs://' + blob.bucket.name + blob.name)
You may also want to look into apache_beam.io.fileio.MatchAll.
Upvotes: 1