zoran119
zoran119

Reputation: 11307

Applying a transform to one output tag

I think I have a function which produces two outputs (please correct me if I'm wrong):

PCollection<String> words = ...;

final TupleTag<String> shortWordsTag = new TupleTag<String>(){};

PCollectionTuple results =
     words.apply(
         ParDo
         .of(new DoFn<String, String>() {
             @ProcessElement
             public void processElement(ProcessContext context) {
                 String word = context.element();
                 if (word.length() < 5) {
                     context.output(shortWordsTag, word);
                 } else {
                     context.output(word);
             }

Now I'd like to call another function, but only apply it one of those outputs. Something like this:

results.apply(
    ParDo
    .of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext context) {
            String word = context.element();
            // do stuff, but should only have words with length < 5 here
    }
)

I can see some examples that use withOutputTags but this method seems to take more than one tag (a tag, and a list of tags), and I'm not sure how to use it for my scenario.

How can I specify my results.apply to be only called for the data which is outputted to shortWordsTag tag?

Upvotes: 1

Views: 2085

Answers (1)

dsesto
dsesto

Reputation: 8178

The correct way to work with multiple outputs in a single transform in Apache Beam is indeed using PCollectionTuple and withOutputTags, as you mentioned.

In the Apache Beam documentation you can find some really good examples about how to set up a transform with several outputs, using different tags for every one of them:

Additionally, if you visit the section 4.5.2 in the second link above, you will find an example on how to emit to multiple outputs in your DoFn. In short, and using the core code you shared, you will need to do the following:

PCollectionTuple results = [...].withOutputTags(MAIN_TAG, LIST_OF_ADDITIONAL_TAGS);

results.get(YOUR_DESIRED_TAG).apply(...);

Calling the get( ) method on a PCollectionTuple will return the PCollection associated with the TupleTag that you will be passing inside the method.

Upvotes: 1

Related Questions