guptas
guptas

Reputation: 21

KafkStreams: Discard message during a window

Need to discard duplicate message within a time window. Message are coming in continuously. Bellow is the part of the code.

 kStream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
             .reduce((k,m) -> m)
             .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
             .toStream()
             .foreach((k, v) -> doSomeProcess(k,v));

What I am doing wrong here. I am not seeing any call to the method doSomeProcess. Messages are coming in.

Upvotes: 1

Views: 48

Answers (1)

guptas
guptas

Reputation: 21

Turned out that "This feature requires adding a "grace period" parameter for windows" From https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables .... .windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ofSeconds(5)) ) ....

This fixed the issue.

Upvotes: 1

Related Questions