practicemakesperfect
practicemakesperfect

Reputation: 387

kafka streams total word count in a message

https://kafka.apache.org/10/documentation/streams/quickstart

I had a question on counting words within a message using kafka streams. Essentially, I'd like to count the total number of words, rather than count each instance of a word. So, instead of

all     1
streams 1
lead    1
to      1
kafka   1

I need

totalWordCount   5

or something similar.

I tried a variety of things to this part of the code :

KTable<String, Long> wordCounts = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> value)
    .count();

such as adding .selectKey((key, value) -> "totalWordCount") in an attempt to change each key (all, streams, etc) to totalWordCount thinking it'll increment itself I've also tried to edit my code using this to try and achieve the total word count.

I have not succeeded, and after doing some more reading, now I am thinking that I have been approaching this incorrectly. It seems as if what I need to do is have 3 topics (I've been working with only 2) and have 2 producers where the last producer somehow takes data from the first producer (that shows the word count of each instance) and basically add up the numbers in order to output the total number of words, but I'm not entirely sure how to approach it. Any help/guidance is greatly appreciated. Thanks.

Upvotes: 0

Views: 1843

Answers (2)

Arpan Sharma
Arpan Sharma

Reputation: 415

@Configuration
@EnableKafkaStreams
public class FirstStreamApp {

@Bean
public KStream<String,String> process(StreamsBuilder builder){
    KStream<String,String> inputStream = builder.stream("streamIn", Consumed.with(Serdes.String(),Serdes.String()));
    KStream<String,String> upperCaseStream = inputStream.mapValues(value->value.toUpperCase());
   upperCaseStream.to("outTopic", Produced.with(Serdes.String(),Serdes.String()));

    KTable<String, Long> wordCounts = upperCaseStream.flatMapValues(v-> Arrays.asList(v.split(" "))).selectKey((k, v) -> v).groupByKey().
           count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
    wordCounts.toStream().to("wordCountTopic", Produced.with(Serdes.String(),Serdes.Long()));

    return upperCaseStream;
}

}

Upvotes: 0

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

Where did you put the selectKey()? The idea is basically correct, but note, that groupBy() does set the key, too.

KTable<String, Long> wordCounts = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> "totalWordCount")
    .count();

or (using groupByKey() to not change the key before the aggregation)

KTable<String, Long> wordCounts = textLines
    .selectKey((key, value) -> "totalWordCount")
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupByKey()
    .count();

Upvotes: 2

Related Questions