Sounak Saha
Sounak Saha

Reputation: 933

One Minute aggregate window giving unexpected result in Kafka 2.4.0

I using the code from https://github.com/gwenshap/kafka-streams-stockstats with latest windowedBy()

final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1));
......
 .windowedBy(window)

Then printing the stream

 stats.foreach((key, value) -> logger.info("Key >>>>> "+ key + 
                " Value => "+value.countTrades));

I am getting the below output for the same key and 1 Minute windowed. I am expecting a single record with this key per 1 min windowed. What I am missing here?

> Line 817: [2021-03-29 15:40:21,444] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 19
> Line 823: [2021-03-29 15:40:52,111] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 43
> Line 837: [2021-03-29 15:41:24,076] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 55

Note: I have tried to suppress() also.

Upvotes: 0

Views: 294

Answers (2)

Sounak Saha
Sounak Saha

Reputation: 933

Below would be the specific code chnages in the above git repo to get results as a single value for a specific key on a Timewindow.

   KStream<String, Trade> source = builder.stream(Constants.STOCK_TOPIC);
    final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(5));
    KStream<String, TradeStats> stats = 
            source
            .groupByKey()
            .windowedBy(window)
            .<TradeStats>aggregate(() -> new TradeStats(),(k, v, tradestats) -> tradestats.add(v),
                    Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
                            .withValueSerde(new TradeStatsSerde()).withKeySerde(Serdes.String()))
            .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()).withName("trade-suppress"))
            .toStream()            
            .map((final Windowed<String> k, final TradeStats v) -> new KeyValue<>(k.key(), v), Named.as("trade-map"));;
    
   stats.foreach((key, value) -> logger.info("Key >>>>> "+ key + " Value => "+value.countTrades));
           

Upvotes: 0

Ganesh
Ganesh

Reputation: 91

Suppress() is the way forward. The article (https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/) explains how one can use. The only caveat in using suppress is that in the next subsequent window, one must receive a new event with the SAME key that you use in the groupBy().

Also please have a look into the comment from the other stackoverflow from Matthias J.Sax: Kafka Stream Suppress session-windowed-aggregation

Upvotes: 1

Related Questions