Reputation: 2407
In the interest of providing a minimal example of my problem, I'm trying to implement a simple Beam job that takes in a String
as a side input and applies it to a PCollection
which is read from a csv file in Cloud Storage. The result is then output to a .txt file in Cloud Storage.
So far, I have tried: Experimenting with PipelineResult.waitUntilFinish
(as in (p.run().waitUntilFinish()
), altering the placement of the two p.run()
commands, and simplifying as much as possible by just using a string as my side input, always with the same result. Searching on Stack and Google just led me to the PR on the Beam repo which implemented the error message.
SideInputTest.java:
public class SideInputTest {
public static void main(String[] arg) throws IOException {
// Build a pipeline to read in string
DataflowPipelineOptions options1 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options1.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options1);
// Build really simple side input
PCollectionView<String> sideInputView = p.apply(Create.of("foo"))
.apply(View.<String>asSingleton());
// Run p
p.run();
// Build main pipeline to read csv data
DataflowPipelineOptions options2 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options2.setProject(PROJECT_NAME);
options2.setStagingLocation(STAGING_LOCATION);
options2.setRunner(DataflowRunner.class);
Pipeline p2 = Pipeline.create(options2);
p2.apply(TextIO.Read.from(INPUT_DATA))
.apply(ParDo.withSideInputs(sideInputView).of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] rowData = c.element().split(",");
String sideInput = c.sideInput(sideInputView);
c.output(rowData[0] + sideInput);
}
}))
.apply(TextIO.Write
.to(OUTPUT_DATA));
p2.run();
}
}
Full stack trace:
Caused by: java.lang.NullPointerException: Unknown producer for value SingletonPCollectionView{tag=Tag<org.apache.beam.sdk.util.PCollectionViews$SimplePCollectionView.<init>:435#3d93cb799b3970be>} while translating step ParDo(Anonymous)
at org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1079)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.getProducer(DataflowPipelineTranslator.java:508)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSideInputs(DataflowPipelineTranslator.java:926)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateInputs(DataflowPipelineTranslator.java:913)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$1100(DataflowPipelineTranslator.java:112)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$7.translateSingleHelper(DataflowPipelineTranslator.java:863)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$7.translate(DataflowPipelineTranslator.java:856)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$7.translate(DataflowPipelineTranslator.java:853)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:415)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:231)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:365)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:154)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:514)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:151)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
at com.xpw.SideInputTest.main(SideInputTest.java:63)
Currently using org.apache.beam
packages @0.6.0
Upvotes: 0
Views: 414
Reputation: 17913
This code is taking a PCollectionView
created in one pipeline (p.apply(Create.of("foo")).apply(View.<String>asSingleton());
) and using it in another pipeline (p2
).
PCollection
's and PCollectionView
's belong to a particular pipeline and reuse of them in a different pipeline is not supported.
You can create an analogous PCollectionView
in p2
.
I'm also confused as to what your pipeline p
is trying to accomplish: the only transform it has is creating the view?.. so there's no data being processed in it. I think you should get rid of p
entirely and just use p2
.
Upvotes: 2