Reputation: 404
This is, in part, a follow-up to Aggregation over a specific partition in Apache Kafka Streams
Let's suppose I have a topic named "events" with 3 partitions on which I send string -> integer data like so:
(Bob, 3) on partition 1
(Sally, 4) on partition 2
(Bob, 2) on partition 3
...
I would like to aggregate the values (in this example, just a simple sum) across all partitions to end up with a KTable
that looks something like:
(Sally, 4)
(Bob, 5)
As mentioned in the answer to the question I linked to above, it's not possible to directly do this kind of cross-partition aggregation. However, the answerer mentioned that it was possible if the messages have the same keys (which is true in this case). How might this be accomplished?
I would also like to be able to query these aggregate values from a "global" state store that is replicated across each instance of the Kafka Streams application.
My first thought was to use a GlobalKTable
(which I believe, according to this page, should be what I need). However, the changelog topic for this state store has the same number of partitions as the original "events" topic, and simply does the aggregation on a per-partition basis rather than across all partitions.
This is a slimmed down version of my application - not really sure where to go from here:
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey()
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();
builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));
Upvotes: 6
Views: 5557
Reputation: 62285
Kafka Streams assumes that input topics are partitioned by key. This assumption does not hold for your case. Thus, you need to tell Kafka Streams about this.
In your particular case, you would replace groupByKey
with groupBy()
KTable<String, Double> aggregatedMetrics = eventStream
.groupBy((k,v) -> k)
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
The lambda is a dummy that does not modify the key, however, it is a hint to Kafka Streams to re-partition the data based on key before doing the aggregation.
About GlobalKTable
: this is a special kind of table that is not the result from an aggregation, but only populated from a changelog topic. It seems your code is doing the right thing already: Write the aggregation result into a topic and re-read the topic as GlobalKTable
.
Upvotes: 7