Reputation: 79
I have this code which tags outputs based on some data of the input file:
class filters(beam.DoFn):
def process(self, element):
data = json.loads(element)
yield TaggedOutput(data['EventName'],element)
I need help with the next step of writting the resulting tagged outputs:
tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?')
So as you can see when I do '.with_outputs()' I dont know how many and what names are the taggs going to be so I can´t predict things like:
tag1 = tagged.tag1
Thank you for your help
UPDATE: this wont work cause with.outputs() is empty
tagged_data= lines | 'tagged data by key' >>
beam.ParDo(filters()).with_outputs()
for tag in tagged_data:
print('something')
output: WARNING:apache_beam.options.pipeline_options:Discarding unparseable args
but this will work
tagged_data= lines | 'tagged data by key' >>
beam.ParDo(filters()).with_outputs('tag1','tag2')
for tag in tagged_data:
print('something')
output:
something
something
Upvotes: 0
Views: 1789
Reputation: 5104
Apache Beam pipeline execution is deferred--a DAG of operations to execute is built up and nothing actually happens until you run your pipeline. (In Beam Python, this is typically implicitly invoked at the end of a with beam.Pipeline(...)
block.). PCollections don't actually contain data, just instructions for how the data should be computed.
In particular, this means that when you write
tagged = lines | beam.ParDo(filters()).with_outputs(...)
tagged doesn't actually contain any data, rather it contains references to the PCollections that will be produced (and further processing steps can be added to them). The data in lines
has not actually been computed or read yet so you can't (during pipeline construction) figure out what the set of outputs is.
It's not clear what your end goal is from the question, but if you're trying to partition outputs, you may want to look into dynamic destinations.
Upvotes: 2
Reputation: 761
To get what you are trying to achieve, you would need to create a DoFn. You can use this example as a base:
from apache_beam.io.textio import _TextSink
class WriteEachKeyToText(beam.DoFn):
def __init__(self, file_path_prefix=str):
super().__init__()
self.file_path_prefix = file_path_prefix
def process(self, kv):
key = kv[0]
elements = kv[1]
sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")
writer = sink.open_writer("prefix", self.file_path_prefix)
for e in elements: # values
writer.write(e)
Then, you can use it like this:
output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))
Upvotes: 2