Reputation: 7025
My Kafka Streams aggregation reads a compact topic and does this:
(0_10, ..)
, (0_11, ..)
--->
(0, [10])
(0, [10, 11])
I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them. Imagine Stream App consumes these messages:
(0_10, ..)
(1_11, ..)
(0_13, ..)
and if the 3 previous messages arrive in a short time window, I expect to see this:
(0,[10])
(0, [10, 13])
(1, [11])
I cannot figure out, how to tell my Kafka Stream application how long to wait for more aggregations, before spitting a new value.
My code is very simple
builder
.table(keySerde, valueSerde, sourceTopic)
.groupBy(StreamBuilder::groupByMapper)
.aggregate(
StreamBuilder::aggregateInitializer,
StreamBuilder::aggregateAdder,
StreamBuilder::aggregateSubtractor)
.to(...);
Currently, it sometime aggregates in batches, but not sure how to tweak it:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
Upvotes: 4
Views: 3376
Reputation: 38113
I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them.
This is not possible with Kafka Streams' windowing. Generally speaking, Kafka Streams windows don't "close" or "end" in the sense that you can't tell it to produce a final result once a window "closes" (there's no such concept). This is to accommodate late arriving results. You will see updates as messages arrive to the aggregation window. The frequency with which Kafka Streams spits out updates depends on caching (see below). For more see: How to send final kafka-streams aggregation result of a time windowed KTable?
Currently, it sometime aggregates in batches, but not sure how to tweak it:
What you're seeing there most likely is the result of caching in the stores that back the KTables
. KTables
only forward downstream messages when their changelogs flush and their offsets are committed. This is to maintain consistency in case their state needs to be restored. If you change your Kafka Streams' application's commit interval your cache flushes will be less frequent, and consequently you will see fewer updates forwarded from KTable
s (changelogs, aggregations, etc). But that's not related to windowing.
With all that said, if you want to do a windowed aggregate of a changelog stream, you can transform it from KTable
to KStream
using KTable#toStream()
. Then you can specify windows in your aggregation step.
Upvotes: 5