Reputation:
I am given the URL of a Google cloud bucket. I have to:
Use the URL to acquire a list of blobs in that bucket
For each blob I make some GCS API calls to get information about the blob (blob.size, blob.name, etc.)
For each blob I have to also read it, find something inside it and add it to the values obtained from the GCS API calls
For each blob I have to write the values found in step 2 and 3 about the blob to BigQuery
I have thousands of blobs so this needs to be done with Apache beam (I've been recommended)
My idea of the pipeline is something like this:
GetUrlOfBucket and make PCollection
Using that PCollection obtain a list of blobs as a new PCollection
Create a PCollection with the metadata of those blobs
Perform a Transform that will take in the PCollection that is a dictionary of metadata values, goes into the blob, scans for a value and returns a new PCollection that is a dictionary of the metadata values and this new value
Write this to BigQuery.
It's particularly hard for me to think about how to return a dictionary as a PCollection
[+] What I've read:
https://beam.apache.org/documentation/programming-guide/#composite-transforms
https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366
Any suggestions, specifically about how to take in that bucket name and return a PCollection of blobs, is greatly appreciated.
Upvotes: 1
Views: 4251
Reputation:
I resolved this by reading more about apache-beam and figuring out that I had to use the ParDo function to split the job between my resources, in the ParDo I call my DoFn function which takes in an element and does all the processing needed for it and yields a dic. refer to this post Apache Beam: How To Simultaneously Create Many PCollections That Undergo Same PTransform?
class ExtractMetadata(beam.DoFn):
def process(self, element):
"""
Takes in a blobName, fetches the blob and its values and returns a dictionary of values
"""
metadata = element.metadata
if metadata is not None:
event_count = int(metadata['count'])
else:
event_count = None
event_type = self.determine_event_type(element.id)
cluster = self.determine_cluster(element.id)
customer = self.determine_customer(element)
# date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')
# date = date.isoformat()
dic = {
'blob_name': element.name,
'event_path': element.path,
'size': int(element.size),
'time_of_creation': element.time_created.isoformat(),
'event_count': event_count,
'event_type': event_type,
'cluster': cluster,
'customer': customer
}
yield dic
Upvotes: 2