Reputation: 48346
The pipeline of beam job give the following exception
java.lang.RuntimeException: java.lang.RuntimeException: Exception while fetching side input:
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:790)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Exception while fetching side input:
org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:196)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:313)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:660)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:723)
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:184)
org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:72)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:790)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871)
org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:192)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:313)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:660)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:723)
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:184)
org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:72)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:790)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
org.apache.beam.sdk.values.PCollectionViews$SingletonViewFn.apply(PCollectionViews.java:231)
org.apache.beam.sdk.values.PCollectionViews$SingletonViewFn.apply(PCollectionViews.java:160)
org.apache.beam.runners.dataflow.worker.StateFetcher.lambda$fetchSideInput$1(StateFetcher.java:162)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4876)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871)
org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:192)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:313)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:660)
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:723)
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:184)
org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:72)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:790)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
With codes snippets. And the beam job is read data from GCS through subscription and write bigquery after processing.
PCollection<KV<String, Long>> p1 = interToCount.apply("Daily Window",
Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardDays(1)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(10))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO));
PCollectionView<Long> totalInters = p1.apply("All Keys", Keys.create())
.apply("Count All Keys",
Combine.globally(Count.<String>combineFn())
.withoutDefaults()
.asSingletonView());
PCollection<KV<String, Long>> p2 = p1.apply("Unit Keys", Keys.create())
.apply("Count Per Unit Keys", Count.perElement());
PCollection<KV<String, Double>> unitRate = p2.apply("Compute Unit Rate",
ParDo.of(new ComputeUnitRate(totalInters))
.withSideInputs(totalInters));
unitRate.apply("Convert to BQ Row", ParDo.of(new ConvertStrToBQRow()))
.apply("Write BQ", BigQueryIO.writeTableRows().to(INTERSHOWTABLE)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
....
static class ComputeUnitRate extends DoFn<KV<String, Long>, KV<String, Double>> {
private PCollectionView<Long> totalInters;
ComputeUnitRate(PCollectionView<Long> totalInters) {
this.totalInters = totalInters;
}
@ProcessElement
public void processElement(ProcessContext c) {
String unit = c.element().getKey();
Long unitCount = c.element().getValue();
Long totalCount = c.sideInput(this.totalInters);
Double showRate = unitCount.doubleValue() / totalCount.doubleValue();
c.output(KV.of(unit, showRate));
}
}
It seems this error comes from step "Compute Unit Rate", However I cannot figure out clearly through exception stack.
How to fix it? Or something am I missing here?
Beam version: 2.16.0
Upvotes: 0
Views: 3511
Reputation: 1
I have the same problem, and I set higher numWorkers and maxNumWorkers, it resolved the problem.
you can do either options.setNumWorkers(DEFAULT_NB_WORKER);
in code
or use mvn to start DataflowRunner with --numWorkers=xx --maxNumWorkers=xx
Upvotes: 0
Reputation: 48346
Per this doc
This is a known issue in the Beam model. Trigger firings should automatically update downstream results, but instead they are treated as new elements. The design for retractions will alleviate this problem
You can use View.asMultimap() which will mean each trigger firing for a key will add a new element to the set of values for a key.
After change PCollectionView
to PCollectionView<Map<Long, Long>>
as below
PCollectionView<Map<Long, Long>> totalInters = p1.apply("All Keys", Keys.create())
.apply("Count All Keys", Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply("Map view multiMap", ParDo.of(new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext ctx) {
ctx.output(KV.of(ctx.pane().getIndex(), ctx.element().longValue()));
}
}))
.apply(View.asMap())
;
The pipeline could work well now.
I think maybe it is one of work around for this issue for now. Maybe there are better solutions.
Upvotes: 1