Manisha
Manisha

Reputation: 21

How to pass dynamic pcollection as sideinput in python?

I have use case in which I need to append a file_path (as a new element) with another pcollection which is a dictionary of elements.

So I decided to take pcollection as input to my next transform and file_path (which is generated from pardo function) as a sideinput.

As generated file_path is a pCollection so I am using it as :

beam.pvalue.AsSingelton(file_path)

but strangely it's not executing this statement hence no output from this and not even any error. But when I create a pcollection using :

file_path = beam.Create(['some_path_value']) and than pass as above beam.pvalue.AsSingleton(file_path) it works perfectly.

but I don't want to create a pcollection explicitly .i need to use above transform output as it is as sideinput.

I tried each form for my "file_path" like made it List, dictionary, tuple.

I tried different beam functions as well.

def collect(pcoll, path):
   print(pcoll, "-----------")
   print(path,"-------------")       
   return path

class Extract(beam.DoFn):    
   def process(self, element, *args, **kwargs):       
       pub_sub_json= json.loads(element)
       gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
       yield gcs_url

file_path = (p |"Read" >> beam.io.ReadFromPubSub(
                           subscription=options.session_subscription,
                           with_attributes=False)
             | "Extract" >> beam.ParDo(Extract()))                     

result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path)))


Expect: It should go inside a collect function and print whatever Extract method will return as file_path.

No Errors and warnings are coming but it's not even going inside the collect method. It mostly happens when the data won't come from the above transforms. But I checked it many times file_name has data.

but no output, it only shows output when I use beam.Create(["pass_some_file_path"]) and use its result as sideinput

Upvotes: 1

Views: 1330

Answers (1)

Neeraj Avutu
Neeraj Avutu

Reputation: 11

Output from a Transform should be in the 2-tuple ( (key,value) ) format in order to accept it as a side input with pvalue.AsDict(). It works well with value.AsList() too.

def collect(pcoll, path):
   print(pcoll, "-----------")
   print(path,"-------------")       
   return path

class Extract(beam.DoFn):    
   def process(self, element, *args, **kwargs):       
       pub_sub_json= json.loads(element)
       gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
       yield pvalue.TaggedOutput("url",("url" :gcs_url))

file_path = (p |"Read" >> beam.io.ReadFromPubSub(
                           subscription=options.session_subscription,
                           with_attributes=False)
             | "Extract" >> beam.ParDo(Extract())) 
file_path_new= (file_path["url"] | "Group by key" >> beam.GroupByKey())                    

result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path_new)))

Were you able to solve it in a better way?

Upvotes: 1

Related Questions