masoud
masoud

Reputation: 1

Apache beam windowing / triggering on dataflow , does not emit as expected using elementCountAtLeast

I am trying to get apache beam emits, after a certain number of elements arrived. Apche beam version is 2.18.0 and I am consuming from pubsub, but the pattern in which data arrives is known beforehand. They arrive in batches of the same size on scheduled time.

Let's say every 10 seconds, 5 elements arrive. So I wanna put all elements of each batch, into the same window so I can process them together. I am using elementCountAtLeast as I know size of the batch which is 5. Therefore, I expect to emit as soon as 5th element of any batch arrives and in case one element is missing, I want to emit whatever I have in current pane, 3 seconds after arrival of first element in the current batch and I use pastFirstElementInPane for this purpose.

input.apply("To KV", ParDo.of(new ToKV()))
                    .apply("Windowin", Window.<KV<String, List<Order>>>into(new GlobalWindows()).withTimestampCombiner(TimestampCombiner.EARLIEST)
                            .triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(5), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3)))))
                            .withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
                            .discardingFiredPanes())

and then I combine elements of a batch into a list using a combine function (I can use GroupByKey and result is the same)

.apply("combine per key", Combine.perKey(toList()))

My problem is that I always receive output of combiner with 3 seconds delay. So it looks like the pane always emits using a pastFirstElementInPane , whereas it should emit earlier using elementCountAtLeast. Can any body spot any problem in my code or it is actually a bug in apache beam / dataflow?

UPDATE 1: I made elementCountAtLeast the only triggering strategy to see how does it behave in absence of any other strategy.

input.apply("To KV", ParDo.of(new ToKV()))
                    .apply("Windowin", Window.<KV<String, List<Order>>>into(new GlobalWindows()).withTimestampCombiner(TimestampCombiner.EARLIEST)
                            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
                            .withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
                            .discardingFiredPanes())
                    .apply("combine per key", Combine.perKey(toList()))

Now the pipeline behaves very strangely. Now it only emits when number of elements per each key are 5. Remember every batch has 5 elements with distinct keys. Next batch will have same keys, different values. So now rather than emitting right after each batch, it waits for 5 batches to arrive, so it has 5 elements per each key and in total 25 elements and then it emits. Looks like a bug in framework to me.

Upvotes: 0

Views: 1290

Answers (1)

danielm
danielm

Reputation: 3010

Windowing and Triggering are defined per-key in Beam. A Pane is the data for a particular key, window, and trigger firing. In this case, that means that your trigger here is waiting for 5 elements or 3 seconds on each key independently.

Separate keys in a GroupByKey or Combine.perKey are processed completely independently (and generally on different workers). This property is what allows pipelines to scale beyond a single machine.

Upvotes: 1

Related Questions