Reputation: 21
Trying to merge multiple Kafka Streams, aggregate & produce to a new topic. However with in the same window, the code produces as many aggregated records as the total input records in each of the input streams. I would expect the aggregate only to produce 1 output at the end of the join window. What am I doing wrong in the code below -
val streams = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
.toStream()
streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))
Upvotes: 0
Views: 1966
Reputation: 62350
Kafka Streams uses a continuous update processing model by default. Note, that the result of an aggregation is a KTable
. This result table contains a row for each window, and each time a new record is processed, the window (ie, row in the table) is updated.
If you call KTable#toStream()
you get the table's changelog stream that contains a record for each update to the table.
If you want to get only a single result per window, you can use the suppress()
operator to get a second KTable
, ie, suppress()
takes the first KTable
s changelog stream, and waits until a window is closed, and only inserts the final result into it's output KTable
. If you use suppress()
, you should set the grace period for the upstream windowed-aggregation (default is 24h) to a lower value, ie, TimeWindows.of(...).grace(...)
.
For more details check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
Upvotes: 1