Reputation: 387
https://kafka.apache.org/10/documentation/streams/quickstart
I had a question on counting words within a message using kafka streams. Essentially, I'd like to count the total number of words, rather than count each instance of a word. So, instead of
all 1
streams 1
lead 1
to 1
kafka 1
I need
totalWordCount 5
or something similar.
I tried a variety of things to this part of the code :
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count();
such as adding .selectKey((key, value) -> "totalWordCount")
in an attempt to change each key (all, streams, etc) to totalWordCount thinking it'll increment itself
I've also tried to edit my code using this to try and achieve the total word count.
I have not succeeded, and after doing some more reading, now I am thinking that I have been approaching this incorrectly. It seems as if what I need to do is have 3 topics (I've been working with only 2) and have 2 producers where the last producer somehow takes data from the first producer (that shows the word count of each instance) and basically add up the numbers in order to output the total number of words, but I'm not entirely sure how to approach it. Any help/guidance is greatly appreciated. Thanks.
Upvotes: 0
Views: 1843
Reputation: 415
@Configuration
@EnableKafkaStreams
public class FirstStreamApp {
@Bean
public KStream<String,String> process(StreamsBuilder builder){
KStream<String,String> inputStream = builder.stream("streamIn", Consumed.with(Serdes.String(),Serdes.String()));
KStream<String,String> upperCaseStream = inputStream.mapValues(value->value.toUpperCase());
upperCaseStream.to("outTopic", Produced.with(Serdes.String(),Serdes.String()));
KTable<String, Long> wordCounts = upperCaseStream.flatMapValues(v-> Arrays.asList(v.split(" "))).selectKey((k, v) -> v).groupByKey().
count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("wordCountTopic", Produced.with(Serdes.String(),Serdes.Long()));
return upperCaseStream;
}
}
Upvotes: 0
Reputation: 62350
Where did you put the selectKey()
? The idea is basically correct, but note, that groupBy()
does set the key, too.
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> "totalWordCount")
.count();
or (using groupByKey()
to not change the key before the aggregation)
KTable<String, Long> wordCounts = textLines
.selectKey((key, value) -> "totalWordCount")
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupByKey()
.count();
Upvotes: 2