Reputation: 21
I have use case in which I need to append a file_path (as a new element) with another pcollection which is a dictionary of elements.
So I decided to take pcollection as input to my next transform and file_path (which is generated from pardo function) as a sideinput.
As generated file_path is a pCollection so I am using it as :
beam.pvalue.AsSingelton(file_path)
but strangely it's not executing this statement hence no output from this and not even any error. But when I create a pcollection using :
file_path = beam.Create(['some_path_value']) and than pass as above beam.pvalue.AsSingleton(file_path) it works perfectly.
but I don't want to create a pcollection explicitly .i need to use above transform output as it is as sideinput.
I tried each form for my "file_path" like made it List, dictionary, tuple.
I tried different beam functions as well.
def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")
return path
class Extract(beam.DoFn):
def process(self, element, *args, **kwargs):
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield gcs_url
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract()))
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path)))
Expect: It should go inside a collect function and print whatever Extract method will return as file_path.
No Errors and warnings are coming but it's not even going inside the collect method. It mostly happens when the data won't come from the above transforms. But I checked it many times file_name has data.
but no output, it only shows output when I use beam.Create(["pass_some_file_path"]) and use its result as sideinput
Upvotes: 1
Views: 1330
Reputation: 11
Output from a Transform should be in the 2-tuple ( (key,value) ) format in order to accept it as a side input with pvalue.AsDict(). It works well with value.AsList() too.
def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")
return path
class Extract(beam.DoFn):
def process(self, element, *args, **kwargs):
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield pvalue.TaggedOutput("url",("url" :gcs_url))
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract()))
file_path_new= (file_path["url"] | "Group by key" >> beam.GroupByKey())
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path_new)))
Were you able to solve it in a better way?
Upvotes: 1