Reputation: 933
I using the code from https://github.com/gwenshap/kafka-streams-stockstats with latest windowedBy()
final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1));
......
.windowedBy(window)
Then printing the stream
stats.foreach((key, value) -> logger.info("Key >>>>> "+ key +
" Value => "+value.countTrades));
I am getting the below output for the same key and 1 Minute windowed. I am expecting a single record with this key per 1 min windowed. What I am missing here?
> Line 817: [2021-03-29 15:40:21,444] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 19 > Line 823: [2021-03-29 15:40:52,111] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 43 > Line 837: [2021-03-29 15:41:24,076] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 55
Note: I have tried to suppress() also.
Upvotes: 0
Views: 294
Reputation: 933
Below would be the specific code chnages in the above git repo to get results as a single value for a specific key on a Timewindow.
KStream<String, Trade> source = builder.stream(Constants.STOCK_TOPIC);
final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(5));
KStream<String, TradeStats> stats =
source
.groupByKey()
.windowedBy(window)
.<TradeStats>aggregate(() -> new TradeStats(),(k, v, tradestats) -> tradestats.add(v),
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
.withValueSerde(new TradeStatsSerde()).withKeySerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()).withName("trade-suppress"))
.toStream()
.map((final Windowed<String> k, final TradeStats v) -> new KeyValue<>(k.key(), v), Named.as("trade-map"));;
stats.foreach((key, value) -> logger.info("Key >>>>> "+ key + " Value => "+value.countTrades));
Upvotes: 0
Reputation: 91
Suppress() is the way forward. The article (https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/) explains how one can use. The only caveat in using suppress is that in the next subsequent window, one must receive a new event with the SAME key that you use in the groupBy().
Also please have a look into the comment from the other stackoverflow from Matthias J.Sax: Kafka Stream Suppress session-windowed-aggregation
Upvotes: 1