user9773014
user9773014

Reputation:

How to return dictionary as a PCollection?

I am given the URL of a Google cloud bucket. I have to:

  1. Use the URL to acquire a list of blobs in that bucket

  2. For each blob I make some GCS API calls to get information about the blob (blob.size, blob.name, etc.)

  3. For each blob I have to also read it, find something inside it and add it to the values obtained from the GCS API calls

  4. For each blob I have to write the values found in step 2 and 3 about the blob to BigQuery

I have thousands of blobs so this needs to be done with Apache beam (I've been recommended)

My idea of the pipeline is something like this:

GetUrlOfBucket and make PCollection

Using that PCollection obtain a list of blobs as a new PCollection

Create a PCollection with the metadata of those blobs

Perform a Transform that will take in the PCollection that is a dictionary of metadata values, goes into the blob, scans for a value and returns a new PCollection that is a dictionary of the metadata values and this new value

Write this to BigQuery.

It's particularly hard for me to think about how to return a dictionary as a PCollection

[+] What I've read:

https://beam.apache.org/documentation/programming-guide/#composite-transforms

https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366

Any suggestions, specifically about how to take in that bucket name and return a PCollection of blobs, is greatly appreciated.

Upvotes: 1

Views: 4251

Answers (1)

user9773014
user9773014

Reputation:

I resolved this by reading more about apache-beam and figuring out that I had to use the ParDo function to split the job between my resources, in the ParDo I call my DoFn function which takes in an element and does all the processing needed for it and yields a dic. refer to this post Apache Beam: How To Simultaneously Create Many PCollections That Undergo Same PTransform?

class ExtractMetadata(beam.DoFn):

    def process(self, element):
        """                                                                                                                                                                                                                                                                        
        Takes in a blobName, fetches the blob and its values and returns a dictionary of values                                                                                                                                                                                    
        """
        metadata = element.metadata
        if metadata is not None:
            event_count = int(metadata['count'])
        else:
            event_count = None

        event_type = self.determine_event_type(element.id)
        cluster = self.determine_cluster(element.id)
        customer = self.determine_customer(element)
        # date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')                                                                                                                                                                                                   
        # date = date.isoformat()                                                                                                                                                                                                                                                   
        dic = {
            'blob_name': element.name,
            'event_path': element.path,
            'size': int(element.size),
            'time_of_creation': element.time_created.isoformat(),
            'event_count': event_count,
            'event_type': event_type,
            'cluster': cluster,
            'customer': customer
        }
        yield dic

Upvotes: 2

Related Questions