Reputation: 81
I have some values in GenericRecord, I need to count this distinct values by Kafka stream. Its must be something like this {"Distinct values count":123}. Please, help me. I`m new in Kafka Streams
Upvotes: 4
Views: 3600
Reputation: 38113
You could do something like:
SteamsBuilder topology = new StreamsBuilder();
KTable<Integer, HashMap<String, Long>> aggregate = topology.stream("input")
.groupBy((k, v) -> 0 /*map all records to same, arbitrary key*/)
.aggregate(() -> new HashMap<String, Long>(),
(k, v, a) -> {
Long count = a.get(v.get("state"));
if (count == null) {
count = 0L;
}
a.put(v.get("state"), ++count);
return a;
});
(I omitted the Serdes
in this case (Consumed
in stream()
, Serialized
in groupBy()
, Materialized
in aggregate()
), you'll need to provide those yourself.)
The resulting KTable
will be a table of distinct state values to their respective counts.
Upvotes: 2