asp
asp

Reputation: 619

Beam CoGroupByKey with fixed window and event time based trigger generates random elements

I have a pipeline in Beam that uses CoGroupByKey to combine 2 PCollections, first one reads from a Pub/Sub subscription and the second one uses the same PCollection, but enriches the data by looking up additional information from a table, using JdbcIO.readAll. So there is no way there would be data in the second PCollection without it being there in the first one.

There is a fixed window of 10seconds with an event based trigger like below;

Repeatedly.forever(
    AfterWatermark.pastEndOfWindow().withEarlyFirings(
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(40))
    ).withLateFirings(AfterPane.elementCountAtLeast(1))
);

The issue I am seeing is that when I stop the pipeline using the Drain mode, it seems to be randomly generating elements for the second PCollection when there has not been any messages coming in to the input Pub/Sub topic. This also happens randomly when the pipeline is running as well, but not consistent, but when draining the pipeline I have been able to consistently reproduce this.

Please find the variation in input vs output below;

CoGroupByKey step input vs output

Upvotes: 0

Views: 708

Answers (1)

robertwb
robertwb

Reputation: 5104

You are using a non-deterministic triggering, which means the output is sensitive to the exact ordering in which events come in. Another way to look at this is that CoGBK does not wait for both sides to come in; the trigger starts ticking as soon as either side comes in.

For example, lets call your PCollections A and A' respectively, and assume they each have two elements a1, a2, a1', and a2' (of common provenance).

Suppose a1 and a1' come into the CoGBK, 39 seconds passes, and then a2 comes in (on the same key), another 2 seconds pass, then a2' comes in. The CoGBK will output ([a1, a2], [a1']) when the 40-second mark hits, and then when the window closes ([], [a2']) will get emitted. (Even if everything is on the same key, this could happen occasionally if there is more than a 40-second walltime delay going through the longer path, and will almost certainly happen for any late data (each side will fire separately).

Draining makes things worse, e.g. I think all processing time triggers fire immediately.

Upvotes: 1

Related Questions