Reputation: 11307
I think I have a function which produces two outputs (please correct me if I'm wrong):
PCollection<String> words = ...;
final TupleTag<String> shortWordsTag = new TupleTag<String>(){};
PCollectionTuple results =
words.apply(
ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
String word = context.element();
if (word.length() < 5) {
context.output(shortWordsTag, word);
} else {
context.output(word);
}
Now I'd like to call another function, but only apply it one of those outputs. Something like this:
results.apply(
ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
String word = context.element();
// do stuff, but should only have words with length < 5 here
}
)
I can see some examples that use withOutputTags
but this method seems to take more than one tag (a tag, and a list of tags), and I'm not sure how to use it for my scenario.
How can I specify my results.apply
to be only called for the data which is outputted to shortWordsTag
tag?
Upvotes: 1
Views: 2085
Reputation: 8178
The correct way to work with multiple outputs in a single transform in Apache Beam is indeed using PCollectionTuple
and withOutputTags
, as you mentioned.
In the Apache Beam documentation you can find some really good examples about how to set up a transform with several outputs, using different tags for every one of them:
Additionally, if you visit the section 4.5.2 in the second link above, you will find an example on how to emit to multiple outputs in your DoFn. In short, and using the core code you shared, you will need to do the following:
PCollectionTuple results = [...].withOutputTags(MAIN_TAG, LIST_OF_ADDITIONAL_TAGS);
results.get(YOUR_DESIRED_TAG).apply(...);
Calling the get( )
method on a PCollectionTuple will return the PCollection associated with the TupleTag that you will be passing inside the method.
Upvotes: 1