Valentin
Valentin

Reputation: 365

sideInput consistensy across multiple workers

I am working on pipeline where I need to broadcast control data to every instance of DoFn transformation. Ideally I would like to get all of those control data, not only the last state. I simplified example to very simple one -- having two CountingInput for side and main input, filtering 30 first ticks for side one and looking for the sideInput.

    PCollection<Long> iDs =
            p.apply(CountingInput.unbounded().withRate(1, Duration.millis(200)))
                    .apply(ParDo.of(new DoFn<Long, Long>() {
                        @Override
                        public void processElement(ProcessContext c) {
                           Long cnt = c.element();
                           if (cnt <= 30) {
                               logger.info("ID=" + cnt);
                               c.output(cnt);
                           }
                        }
                    }));

    PCollectionView<List<Long>> iDsView = iDs
            .apply(Window.<Long>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                    .discardingFiredPanes()
            )
            .apply(View.asList());

    p.apply(CountingInput.unbounded().withRate(1, Duration.millis(1000)))
            .apply(ParDo
                    .withSideInputs(iDsView)
                    .of(new DoFn<Long, String>() {
                        @Override
                        public void processElement(ProcessContext c) {
                            Long in = c.element();
                            List<Long> si = c.sideInput(iDsView);

                            StringBuilder sb = new StringBuilder();
                            si.forEach(x -> sb.append(",").append(x));
                            logger.info("invocation=" + in
                                    + " class=" + this.toString()
                                    + " sideInput=[" + sb.toString().substring(1) + "]");
                        }
                    }));

With InProcessPipelineRunner I eventually see output like

INFO: invocation=10 class=com.sandbox.dw.WriteLogsToBQ$2@221a6d4a sideInput=[30]
Jan 08, 2017 12:38:44 AM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: invocation=11 class=com.sandbox.dw.WriteLogsToBQ$2@449a2a7a sideInput=[30]
Jan 08, 2017 12:38:45 AM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: invocation=12 class=com.sandbox.dw.WriteLogsToBQ$2@4a289e60 sideInput=[30]

But when I run it with --runner=BlockingDataflowPipelineRunner --numWorkers=4 I see quite inconsistent sideInlut among 4 workers, which was the same for several minutes I ran the pipeline:

    00:47:16.586
    invocation=138 class=com.sandbox.dw.WriteLogsToBQ$2@312aa182 sideInput=[0]
    00:47:15.709
    invocation=137 class=com.sandbox.dw.WriteLogsToBQ$2@2d0b6481 sideInput=[3,6,9,12,18,21,24,30]
    00:47:14.445
    invocation=136 class=com.sandbox.dw.WriteLogsToBQ$2@5153b895 sideInput=[0]
    00:47:11.760
    invocation=134 class=com.sandbox.dw.WriteLogsToBQ$2@65683230 sideInput=[3,6,9,12,18,21,24,30]
    00:47:11.231
    invocation=132 class=com.sandbox.dw.WriteLogsToBQ$2@5ee8917a sideInput=[0]
    00:47:10.775
    invocation=133 class=com.sandbox.dw.WriteLogsToBQ$2@16000b0 sideInput=[3,6,9,12,18,21,24,30]
    00:47:09.477
    invocation=123 class=com.sandbox.dw.WriteLogsToBQ$2@6ffe3f47 sideInput=[15]
    00:47:08.977
    invocation=130 class=com.sandbox.dw.WriteLogsToBQ$2@458bc76b sideInput=[3,6,9,12,18,21,24,30]
    00:47:07.505
    invocation=129 class=com.sandbox.dw.WriteLogsToBQ$2@2c6fcbcf sideInput=[0]
    00:47:07.200
    invocation=128 class=com.sandbox.dw.WriteLogsToBQ$2@1bf63883 sideInput=[3,6,9,12,18,21,24,30]
    00:47:06.033
    invocation=127 class=com.sandbox.dw.WriteLogsToBQ$2@5fd02daf sideInput=[3,6,9,12,18,21,24,30]
    00:47:05.573
    invocation=119 class=com.sandbox.dw.WriteLogsToBQ$2@7ba4a88b sideInput=[15]
    00:47:04.502
    invocation=126 class=com.sandbox.dw.WriteLogsToBQ$2@a7d0a48 sideInput=[0]

I also noticed that instance of DoFn was recreated per each input element. Could anyone suggest the best way to guarantee broadcast of PubSub data to every transforms with sideInput?


Here is simplified example to share my concerns:

    PCollectionView<List<Long>> iDsView =
            p.apply(CountingInput.unbounded().withRate(1, Duration.millis(1000)))
                    .apply(Window.<Long>into(new GlobalWindows())
                            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                            .discardingFiredPanes()
                    )
                    .apply(Max.longsGlobally())
                    .apply(ParDo.of(new DoFn<Long, Long>() {
                        @Override
                        public void processElement(ProcessContext c) {
                            Long elem = c.element();
                            logger.info("MaxElement=" + elem);
                            c.output(elem);
                        }
                    }))
                    .apply(View.asList());

    p.apply(CountingInput.unbounded().withRate(1, Duration.millis(300)))
            .apply(ParDo
                    .withSideInputs(iDsView)
                    .of(new DoFn<Long, Long>() {
                        @Override
                        public void processElement(ProcessContext c) {
                            Long in = c.element();
                            List<Long> si = c.sideInput(iDsView);
                            StringBuilder sb = new StringBuilder();
                            si.forEach(x -> sb.append(",").append(x));
                            logger.info("MainInput=" + in
                                    + " sideInput=[" + sb.toString().substring(1) + "]");
                        }
                    }));

It's working well on local runner, but side input is not updated with BlockingDataflowPipelineRunner ever with the single worker. I can see how trigger was firing and combine function sendt results in logs. But side input return the same value for values.

local runner logs:

    INFO: MaxElement=6
    Jan 12, 2017 9:22:52 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=21 sideInput=[6]
    Jan 12, 2017 9:22:52 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=22 sideInput=[6]
    Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=23 sideInput=[6]
    Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$1 processElement
    INFO: MaxElement=7
    Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=24 sideInput=[7]
    Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=25 sideInput=[7]
    Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=26 sideInput=[7]
    Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$1 processElement
    INFO: MaxElement=8
    Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=27 sideInput=[8]
    Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=28 sideInput=[8]
    Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=29 sideInput=[8]
    Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=30 sideInput=[8]
    Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$1 processElement
    INFO: MaxElement=9
    Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=31 sideInput=[9]
    Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=32 sideInput=[9]
    Jan 12, 2017 9:22:56 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
    INFO: MainInput=33 sideInput=[9]
    Jan 12, 2017 9:22:56 PM com.sandbox.dw.WriteLogsToBQ$1 processElement

DataFlow logs:

    MaxElement=61
    21:26:00.225
    MainInput=207 sideInput=[0]
    21:26:00.676
    MainInput=208 sideInput=[0]
    21:26:00.924
    MainInput=209 sideInput=[0]
    21:26:01.258
    MainInput=210 sideInput=[0]
    21:26:01.260
    MaxElement=62
    21:26:01.518
    MainInput=211 sideInput=[0]
    21:26:01.748
    MainInput=212 sideInput=[0]
    21:26:02.071
    MainInput=213 sideInput=[0]
    21:26:02.313
    MainInput=214 sideInput=[0]
    21:26:02.466
    MaxElement=63
    21:26:02.677
    MainInput=215 sideInput=[0]
    21:26:02.994
    MaxElement=64
    21:26:03.113
    MainInput=216 sideInput=[0]
    21:26:03.335
    MainInput=217 sideInput=[0]
    21:26:03.614
    MainInput=218 sideInput=[0]
    21:26:04.132
    MainInput=219 sideInput=[0]
    21:26:04.142
    MaxElement=65
    21:26:04.193
    MainInput=220 sideInput=[0]
    21:26:04.538
    MainInput=221 sideInput=[0]
    21:26:04.706
    MainInput=222 sideInput=[0]
    21:26:05.250
    MaxElement=66
    21:26:05.531
    MainInput=224 sideInput=[0]

My concern is the fact that side input shows stale results, I would expect side input cache invalidation when trigger is fired.

Upvotes: 0

Views: 1068

Answers (3)

Kenn Knowles
Kenn Knowles

Reputation: 6023

In Beam, which will form the basis for Dataflow 2.x, you can gain tighter control over slowly changing side input dimensions using state.

For space, I will assume that the Either type exists and has a default coder. Then if you have some types Key, MainInput, and SideInput and whatever custom code you like to update your supplementary state, a stateful DoFn to implement what you want might look like this:

new DoFn<KV<Key, Either<MainInput, SideInputUpdate>>, KV<Key, Output>>() {

  @StateId("side")
  private final StateSpec<ValueState<SideInput>> detailsSpec =
      StateSpecs.value(SideInput.getCoder());

  @ProcessElement
  public void processElement(
      ProcessContext ctx,
      @StateId("side") ValueState<SideInput> sideState) {

    SideInput side = sideState.read();

    if (ctx.element().getValue().isRight()) {
      SideInputUpdate update = ctx.element().getValue().getRight();
      sideState.write(sideInput.applyUpdate(update));
    } else {
      MainInput element = ctx.element().getValue().getLeft();
      // do whatever you want to do with the element
    }
  }
}

To use this DoFn, you'll inject your main and side inputs into a PCollection<Either<MainInput, SideInputUpdate>> like this:

PCollection<MainInput> mainInput = ...
PCollection<SideInputUpdate> sideInputUpdates = ...

PCollection<Either<MainInput, SideInputUpdate>> injectedMain = 
    mainInput.apply(MapElements.via(... in left ...));
PCollection<Either<MainInput, SideInputUpdate>> injectedSide = 
    sideInputUpdates.apply(MapElements.via(... in right ...);

PCollection<KV<Key, Either<MainInput, SideInputUpdate>>> =
    PCollectionList.of(injectedMain).and(injectedSide)
        .apply(Flatten.pCollections())
        .apply(WithKeys.of(...))

The results are still nondeterministic - your side input updates and main input elements have no ordering guarantees. But the latency will be lower and fairly predictable, as the state will be updated before any further processing on that key and window.

For more about this new capability, see this blog post on the Beam blog.

Upvotes: 0

Kenn Knowles
Kenn Knowles

Reputation: 6023

You are going to run into problems with the consistency between main input elements and multiple triggerings on the side inputs. The consistency model is very loose:

  • Each triggering will cause all workers to eventually be updated with the element that was output, bit with no deadline or synchronization rules.
  • In the meantime, another triggering may occur before they read it, so a particular output element may never be seen.

For the triggering itself, keep in mind that it is also nondeterministic:

  • Values will be output at natural commit points in processing, even if the trigger's predicate is satisfied by fewer elements than a full bundle. This allows efficient predictable behavior in the presence of merging windows, retries, network delays, and bundling variations.
  • Hence it is specified to be after element count is at least 1. The entirety of a bundle of processed elements will be triggered together, even if the trigger's predicate was satisfied by just one. So it is possible to see any subset of 0 through 30.

So your pipeline layout is most naturally applicable to cases where the side input represents a nondeterministic convergence towards some final value (which may be infinite, so the convergence may continue forever). Instead, I think you'll want a proper join via CoGroupByKey or a direct side channel subscription as suggested in Sam's answer.

Depending on the actual details of your use case, there may be other more esoteric solutions using custom WindowFn. The main input will block until the side input has at least one triggered element in the matching window.

And, finally, a quick note about accumulation mode:

  • Based on your statement that you "would like to get all of that control data, not only the last state" it sounds like you would want to replace discardingFiredPanes() with accumulatingFiredPanes(), but this will cause the side input to grow without bound.
  • If you used windowing to limit the lifetime of your side inputs, then accumulatingFiredPanes() would work better - all workers will eventually receive all control messages and the accumulated messages for a window would be released when the window expires. But there is no guarantee that a main input element actually arrives late enough to see the final value! So it still is probably not the right approach.

Upvotes: 2

Sam McVeety
Sam McVeety

Reputation: 3214

Depending on your consistency needs, you could have a per-process static subscriber to the Pub/Sub topic that broadcasts the control messages. This would be an object that, on initialization, created a random subscription to the topic and starts listening to that topic. Whenever it receives a message, it makes the contents of that available to the DoFn's processing code.

Note that there's no great way to ensure that these subscriptions are cleaned up, so you'll want some means of periodic cleanup if you are starting and stopping the pipeline frequently.

Upvotes: 1

Related Questions