Mitch Lillie
Mitch Lillie

Reputation: 2407

Unknown producer for value SingletonPCollectionView

In the interest of providing a minimal example of my problem, I'm trying to implement a simple Beam job that takes in a String as a side input and applies it to a PCollection which is read from a csv file in Cloud Storage. The result is then output to a .txt file in Cloud Storage.

So far, I have tried: Experimenting with PipelineResult.waitUntilFinish (as in (p.run().waitUntilFinish()), altering the placement of the two p.run() commands, and simplifying as much as possible by just using a string as my side input, always with the same result. Searching on Stack and Google just led me to the PR on the Beam repo which implemented the error message.

SideInputTest.java:

public class SideInputTest {

    public static void main(String[] arg) throws IOException {

        // Build a pipeline to read in string
        DataflowPipelineOptions options1 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options1.setRunner(DataflowRunner.class);
        Pipeline p = Pipeline.create(options1);

        // Build really simple side input
        PCollectionView<String> sideInputView = p.apply(Create.of("foo"))
            .apply(View.<String>asSingleton());

        // Run p
        p.run();

        // Build main pipeline to read csv data
        DataflowPipelineOptions options2 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options2.setProject(PROJECT_NAME);
        options2.setStagingLocation(STAGING_LOCATION);
        options2.setRunner(DataflowRunner.class);
        Pipeline p2 = Pipeline.create(options2);

        p2.apply(TextIO.Read.from(INPUT_DATA))
            .apply(ParDo.withSideInputs(sideInputView).of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String[] rowData = c.element().split(",");
                    String sideInput = c.sideInput(sideInputView);

                    c.output(rowData[0] + sideInput);
                }
            }))
            .apply(TextIO.Write
                .to(OUTPUT_DATA));

        p2.run();

    }
}

Full stack trace:

Caused by: java.lang.NullPointerException: Unknown producer for value SingletonPCollectionView{tag=Tag<org.apache.beam.sdk.util.PCollectionViews$SimplePCollectionView.<init>:435#3d93cb799b3970be>} while translating step ParDo(Anonymous)
    at org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1079)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.getProducer(DataflowPipelineTranslator.java:508)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSideInputs(DataflowPipelineTranslator.java:926)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateInputs(DataflowPipelineTranslator.java:913)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$1100(DataflowPipelineTranslator.java:112)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$7.translateSingleHelper(DataflowPipelineTranslator.java:863)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$7.translate(DataflowPipelineTranslator.java:856)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$7.translate(DataflowPipelineTranslator.java:853)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:415)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:231)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:365)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:154)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:514)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:151)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
    at com.xpw.SideInputTest.main(SideInputTest.java:63)

Currently using org.apache.beam packages @0.6.0

Upvotes: 0

Views: 414

Answers (1)

jkff
jkff

Reputation: 17913

This code is taking a PCollectionView created in one pipeline (p.apply(Create.of("foo")).apply(View.<String>asSingleton()‌​);) and using it in another pipeline (p2).

PCollection's and PCollectionView's belong to a particular pipeline and reuse of them in a different pipeline is not supported.

You can create an analogous PCollectionView in p2.

I'm also confused as to what your pipeline p is trying to accomplish: the only transform it has is creating the view?.. so there's no data being processed in it. I think you should get rid of p entirely and just use p2.

Upvotes: 2

Related Questions