Reputation: 365
I am working on pipeline where I need to broadcast control data to every instance of DoFn transformation. Ideally I would like to get all of those control data, not only the last state. I simplified example to very simple one -- having two CountingInput for side and main input, filtering 30 first ticks for side one and looking for the sideInput.
PCollection<Long> iDs =
p.apply(CountingInput.unbounded().withRate(1, Duration.millis(200)))
.apply(ParDo.of(new DoFn<Long, Long>() {
@Override
public void processElement(ProcessContext c) {
Long cnt = c.element();
if (cnt <= 30) {
logger.info("ID=" + cnt);
c.output(cnt);
}
}
}));
PCollectionView<List<Long>> iDsView = iDs
.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
)
.apply(View.asList());
p.apply(CountingInput.unbounded().withRate(1, Duration.millis(1000)))
.apply(ParDo
.withSideInputs(iDsView)
.of(new DoFn<Long, String>() {
@Override
public void processElement(ProcessContext c) {
Long in = c.element();
List<Long> si = c.sideInput(iDsView);
StringBuilder sb = new StringBuilder();
si.forEach(x -> sb.append(",").append(x));
logger.info("invocation=" + in
+ " class=" + this.toString()
+ " sideInput=[" + sb.toString().substring(1) + "]");
}
}));
With InProcessPipelineRunner I eventually see output like
INFO: invocation=10 class=com.sandbox.dw.WriteLogsToBQ$2@221a6d4a sideInput=[30]
Jan 08, 2017 12:38:44 AM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: invocation=11 class=com.sandbox.dw.WriteLogsToBQ$2@449a2a7a sideInput=[30]
Jan 08, 2017 12:38:45 AM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: invocation=12 class=com.sandbox.dw.WriteLogsToBQ$2@4a289e60 sideInput=[30]
But when I run it with --runner=BlockingDataflowPipelineRunner --numWorkers=4 I see quite inconsistent sideInlut among 4 workers, which was the same for several minutes I ran the pipeline:
00:47:16.586
invocation=138 class=com.sandbox.dw.WriteLogsToBQ$2@312aa182 sideInput=[0]
00:47:15.709
invocation=137 class=com.sandbox.dw.WriteLogsToBQ$2@2d0b6481 sideInput=[3,6,9,12,18,21,24,30]
00:47:14.445
invocation=136 class=com.sandbox.dw.WriteLogsToBQ$2@5153b895 sideInput=[0]
00:47:11.760
invocation=134 class=com.sandbox.dw.WriteLogsToBQ$2@65683230 sideInput=[3,6,9,12,18,21,24,30]
00:47:11.231
invocation=132 class=com.sandbox.dw.WriteLogsToBQ$2@5ee8917a sideInput=[0]
00:47:10.775
invocation=133 class=com.sandbox.dw.WriteLogsToBQ$2@16000b0 sideInput=[3,6,9,12,18,21,24,30]
00:47:09.477
invocation=123 class=com.sandbox.dw.WriteLogsToBQ$2@6ffe3f47 sideInput=[15]
00:47:08.977
invocation=130 class=com.sandbox.dw.WriteLogsToBQ$2@458bc76b sideInput=[3,6,9,12,18,21,24,30]
00:47:07.505
invocation=129 class=com.sandbox.dw.WriteLogsToBQ$2@2c6fcbcf sideInput=[0]
00:47:07.200
invocation=128 class=com.sandbox.dw.WriteLogsToBQ$2@1bf63883 sideInput=[3,6,9,12,18,21,24,30]
00:47:06.033
invocation=127 class=com.sandbox.dw.WriteLogsToBQ$2@5fd02daf sideInput=[3,6,9,12,18,21,24,30]
00:47:05.573
invocation=119 class=com.sandbox.dw.WriteLogsToBQ$2@7ba4a88b sideInput=[15]
00:47:04.502
invocation=126 class=com.sandbox.dw.WriteLogsToBQ$2@a7d0a48 sideInput=[0]
I also noticed that instance of DoFn was recreated per each input element. Could anyone suggest the best way to guarantee broadcast of PubSub data to every transforms with sideInput?
Here is simplified example to share my concerns:
PCollectionView<List<Long>> iDsView =
p.apply(CountingInput.unbounded().withRate(1, Duration.millis(1000)))
.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
)
.apply(Max.longsGlobally())
.apply(ParDo.of(new DoFn<Long, Long>() {
@Override
public void processElement(ProcessContext c) {
Long elem = c.element();
logger.info("MaxElement=" + elem);
c.output(elem);
}
}))
.apply(View.asList());
p.apply(CountingInput.unbounded().withRate(1, Duration.millis(300)))
.apply(ParDo
.withSideInputs(iDsView)
.of(new DoFn<Long, Long>() {
@Override
public void processElement(ProcessContext c) {
Long in = c.element();
List<Long> si = c.sideInput(iDsView);
StringBuilder sb = new StringBuilder();
si.forEach(x -> sb.append(",").append(x));
logger.info("MainInput=" + in
+ " sideInput=[" + sb.toString().substring(1) + "]");
}
}));
It's working well on local runner, but side input is not updated with BlockingDataflowPipelineRunner ever with the single worker. I can see how trigger was firing and combine function sendt results in logs. But side input return the same value for values.
local runner logs:
INFO: MaxElement=6
Jan 12, 2017 9:22:52 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=21 sideInput=[6]
Jan 12, 2017 9:22:52 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=22 sideInput=[6]
Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=23 sideInput=[6]
Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$1 processElement
INFO: MaxElement=7
Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=24 sideInput=[7]
Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=25 sideInput=[7]
Jan 12, 2017 9:22:53 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=26 sideInput=[7]
Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$1 processElement
INFO: MaxElement=8
Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=27 sideInput=[8]
Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=28 sideInput=[8]
Jan 12, 2017 9:22:54 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=29 sideInput=[8]
Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=30 sideInput=[8]
Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$1 processElement
INFO: MaxElement=9
Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=31 sideInput=[9]
Jan 12, 2017 9:22:55 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=32 sideInput=[9]
Jan 12, 2017 9:22:56 PM com.sandbox.dw.WriteLogsToBQ$2 processElement
INFO: MainInput=33 sideInput=[9]
Jan 12, 2017 9:22:56 PM com.sandbox.dw.WriteLogsToBQ$1 processElement
DataFlow logs:
MaxElement=61
21:26:00.225
MainInput=207 sideInput=[0]
21:26:00.676
MainInput=208 sideInput=[0]
21:26:00.924
MainInput=209 sideInput=[0]
21:26:01.258
MainInput=210 sideInput=[0]
21:26:01.260
MaxElement=62
21:26:01.518
MainInput=211 sideInput=[0]
21:26:01.748
MainInput=212 sideInput=[0]
21:26:02.071
MainInput=213 sideInput=[0]
21:26:02.313
MainInput=214 sideInput=[0]
21:26:02.466
MaxElement=63
21:26:02.677
MainInput=215 sideInput=[0]
21:26:02.994
MaxElement=64
21:26:03.113
MainInput=216 sideInput=[0]
21:26:03.335
MainInput=217 sideInput=[0]
21:26:03.614
MainInput=218 sideInput=[0]
21:26:04.132
MainInput=219 sideInput=[0]
21:26:04.142
MaxElement=65
21:26:04.193
MainInput=220 sideInput=[0]
21:26:04.538
MainInput=221 sideInput=[0]
21:26:04.706
MainInput=222 sideInput=[0]
21:26:05.250
MaxElement=66
21:26:05.531
MainInput=224 sideInput=[0]
My concern is the fact that side input shows stale results, I would expect side input cache invalidation when trigger is fired.
Upvotes: 0
Views: 1068
Reputation: 6023
In Beam, which will form the basis for Dataflow 2.x, you can gain tighter control over slowly changing side input dimensions using state.
For space, I will assume that the Either
type exists and has a default coder. Then if you have some types Key
, MainInput
, and SideInput
and whatever custom code you like to update your supplementary state, a stateful DoFn to implement what you want might look like this:
new DoFn<KV<Key, Either<MainInput, SideInputUpdate>>, KV<Key, Output>>() {
@StateId("side")
private final StateSpec<ValueState<SideInput>> detailsSpec =
StateSpecs.value(SideInput.getCoder());
@ProcessElement
public void processElement(
ProcessContext ctx,
@StateId("side") ValueState<SideInput> sideState) {
SideInput side = sideState.read();
if (ctx.element().getValue().isRight()) {
SideInputUpdate update = ctx.element().getValue().getRight();
sideState.write(sideInput.applyUpdate(update));
} else {
MainInput element = ctx.element().getValue().getLeft();
// do whatever you want to do with the element
}
}
}
To use this DoFn
, you'll inject your main and side inputs into a PCollection<Either<MainInput, SideInputUpdate>>
like this:
PCollection<MainInput> mainInput = ...
PCollection<SideInputUpdate> sideInputUpdates = ...
PCollection<Either<MainInput, SideInputUpdate>> injectedMain =
mainInput.apply(MapElements.via(... in left ...));
PCollection<Either<MainInput, SideInputUpdate>> injectedSide =
sideInputUpdates.apply(MapElements.via(... in right ...);
PCollection<KV<Key, Either<MainInput, SideInputUpdate>>> =
PCollectionList.of(injectedMain).and(injectedSide)
.apply(Flatten.pCollections())
.apply(WithKeys.of(...))
The results are still nondeterministic - your side input updates and main input elements have no ordering guarantees. But the latency will be lower and fairly predictable, as the state will be updated before any further processing on that key and window.
For more about this new capability, see this blog post on the Beam blog.
Upvotes: 0
Reputation: 6023
You are going to run into problems with the consistency between main input elements and multiple triggerings on the side inputs. The consistency model is very loose:
For the triggering itself, keep in mind that it is also nondeterministic:
So your pipeline layout is most naturally applicable to cases where the side input represents a nondeterministic convergence towards some final value (which may be infinite, so the convergence may continue forever). Instead, I think you'll want a proper join via CoGroupByKey
or a direct side channel subscription as suggested in Sam's answer.
Depending on the actual details of your use case, there may be other more esoteric solutions using custom WindowFn
. The main input will block until the side input has at least one triggered element in the matching window.
And, finally, a quick note about accumulation mode:
discardingFiredPanes()
with accumulatingFiredPanes()
, but this will cause the side input to grow without bound.accumulatingFiredPanes()
would work better - all workers will eventually receive all control messages and the accumulated messages for a window would be released when the window expires. But there is no guarantee that a main input element actually arrives late enough to see the final value! So it still is probably not the right approach.Upvotes: 2
Reputation: 3214
Depending on your consistency needs, you could have a per-process static subscriber to the Pub/Sub topic that broadcasts the control messages. This would be an object that, on initialization, created a random subscription to the topic and starts listening to that topic. Whenever it receives a message, it makes the contents of that available to the DoFn's processing code.
Note that there's no great way to ensure that these subscriptions are cleaned up, so you'll want some means of periodic cleanup if you are starting and stopping the pipeline frequently.
Upvotes: 1