Reputation: 2825
I have a kafka topic named input
with multiple partitions.
Let's say a message looks like this:
{
"key": 123456,
"otherKey": 444,
...
}
Records are partitioned by "key" (and so the same key will always end up processed by the same Kafka consumer).
Now I would like to count the number of events for each "otherKey" per minute. This, to my understanding, can easily be done using KStreams
like this:
input.groupBy((k, v) -> v.getOtherKey())
.windowedBy(TimeWindows.of(Duration.of(60, SECONDS)))
.count()
.suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("output");
With groupBy
, Kafka Streams will repartition the data to an internal kafka topic, with 1 event for each event in input
topic.
This seems like a waste to me. It could have counted the messages in each kafka consumer (counting only for the consumer's partitions) per "otherKey" and publish to the internal topic only once a minute per "otherKey".
Is there a way to do this using Kafka Streams?
Upvotes: 3
Views: 142
Reputation: 62350
Your observation about the behavior is correct and your idea to optimize the execution is also correct.
However, this optimization is currently not implemented. The reason is, that suppress()
is a quite new operator and the optimization you describe did not make sense before suppress()
was introduced.
If you really want to do this optimization, you can build it using Processor API, though.
Upvotes: 1