SDG
SDG

Reputation: 2342

Convert a list into a PCollection

I currently have a DoFn that looks at a bucket and looks at all the files within that bucket and dir prefix. This DoFn returns a list instead of a PCollection. How would I convert this list into a PCollection that can be consumed by the DoFn ConvertFileNames?

  # List all the files within a subdir 
  class ListBlobs(beam.DoFn):
    def start_bundle(self):
      self.storage_client = storage.Client()

    def process(self, prefix):
      bucket = self.storage_client.bucket('xxx')
      return list(self.bucket.list_blobs(prefix=prefix))

  # Convert Blobs into filenames as patterns
  class ConvertFileNames(beam.DoFn):
    def process(self, blob):
      return 'gs://' + blob.bucket.name + blob.name

Upvotes: 0

Views: 1080

Answers (1)

robertwb
robertwb

Reputation: 5104

As mentioned in the beam documentation, a Beam DoFn's process method returns an iterable of elements to place into the downstream PCollection. So, in your example, if I had a PCollection of prefixes, call it prefix_pcoll, then I could write

blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())

and blobs_pcoll will contain the list of blobs with this prefix (namely, the concatination of list(self.bucket.list_blobs(prefix=prefix)) over all prefixes). You could then write

converted = blobs_pcoll | beam.ParDo(ConvertFileNames())

you could also write

converted = blobs_pcoll | beam.Map(
    lambda blob: 'gs://' + blob.bucket.name + blob.name)

You may also want to look into apache_beam.io.fileio.MatchAll.

Upvotes: 1

Related Questions