user1028741
user1028741

Reputation: 2825

How to optimize aggregation so that aggregatiton per consumer is done first?

I have a kafka topic named input with multiple partitions.

Let's say a message looks like this:

{
    "key": 123456, 
    "otherKey": 444, 
    ... 
}

Records are partitioned by "key" (and so the same key will always end up processed by the same Kafka consumer).

Now I would like to count the number of events for each "otherKey" per minute. This, to my understanding, can easily be done using KStreams like this:

input.groupBy((k, v) -> v.getOtherKey())
     .windowedBy(TimeWindows.of(Duration.of(60, SECONDS)))
     .count()
     .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
     .toStream()
     .to("output");

With groupBy, Kafka Streams will repartition the data to an internal kafka topic, with 1 event for each event in input topic.

This seems like a waste to me. It could have counted the messages in each kafka consumer (counting only for the consumer's partitions) per "otherKey" and publish to the internal topic only once a minute per "otherKey".

Is there a way to do this using Kafka Streams?

Upvotes: 3

Views: 142

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

Your observation about the behavior is correct and your idea to optimize the execution is also correct.

However, this optimization is currently not implemented. The reason is, that suppress() is a quite new operator and the optimization you describe did not make sense before suppress() was introduced.

If you really want to do this optimization, you can build it using Processor API, though.

Upvotes: 1

Related Questions