Reputation: 2104
I am using fixed windows in my pipeline with this configuration
Window.<KV<String, DeviceData>>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))
.withAllowedLateness(Duration.standardHours(3))
.accumulatingFiredPanes();
Is my assumption correct that there will be one trigger after closing of the window and then additionally a trigger each 10 minutes after the window has closed if there is late data until 3 hours passed since the closing of the window?
How can I handle data import for periods in time that are further in past? (for example months). In this case I would receive no triggers at all.
Upvotes: 1
Views: 336
Reputation: 1428
It seems that you are missing Repeatedly.forever to allow triggering multiple times, in order for it to match your description.
For data further in the past, you need to configure .withAllowedLateness()
based on the lateness that you want to handle.
Upvotes: 2