Reputation: 21
Need to discard duplicate message within a time window. Message are coming in continuously. Bellow is the part of the code.
kStream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.reduce((k,m) -> m)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k, v) -> doSomeProcess(k,v));
What I am doing wrong here. I am not seeing any call to the method doSomeProcess. Messages are coming in.
Upvotes: 1
Views: 48
Reputation: 21
Turned out that "This feature requires adding a "grace period" parameter for windows" From https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables .... .windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ofSeconds(5)) ) ....
This fixed the issue.
Upvotes: 1