mi.mo
mi.mo

Reputation: 81

Distinct values count by Kafka Streams

I have some values in GenericRecord, I need to count this distinct values by Kafka stream. Its must be something like this {"Distinct values count":123}. Please, help me. I`m new in Kafka Streams

Upvotes: 4

Views: 3600

Answers (1)

Dmitry Minkovsky
Dmitry Minkovsky

Reputation: 38113

You could do something like:

SteamsBuilder topology = new StreamsBuilder();

KTable<Integer, HashMap<String, Long>> aggregate = topology.stream("input")
  .groupBy((k, v) -> 0 /*map all records to same, arbitrary key*/)
  .aggregate(() -> new HashMap<String, Long>(),
             (k, v, a) -> {
                Long count = a.get(v.get("state"));
                if (count == null) {
                    count = 0L;
                }
                a.put(v.get("state"), ++count);
                return a;
             });

(I omitted the Serdes in this case (Consumed in stream(), Serialized in groupBy(), Materialized in aggregate()), you'll need to provide those yourself.)

The resulting KTable will be a table of distinct state values to their respective counts.

Upvotes: 2

Related Questions