Jun
Jun

Reputation: 639

How to count unique words in a stream?

Is there a way to count the number of unique words in a stream with Flink Streaming? The results would be a stream of number which keeps increasing.

Upvotes: 11

Views: 2836

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

You can solve the problem by storing all words which you've already seen. Having this knowledge you can filter out all duplicate words. The rest can then be counted by a map operator with parallelism 1. The following code snippet does exactly that.

val env = StreamExecutionEnvironment.getExecutionEnvironment

val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo")

// filter words out which we have already seen
val uniqueWords = inputStream.keyBy(x => x).filterWithState{
  (word, seenWordsState: Option[Set[String]]) => seenWordsState match {
    case None => (true, Some(HashSet(word)))
    case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word))
  }
}

// count the number of incoming (first seen) words
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{
  (word, counterState: Option[Int]) =>
    counterState match {
      case None => (1, Some(1))
      case Some(counter) => (counter + 1, Some(counter + 1))
    }
}.setParallelism(1)

numberUniqueWords.print();

env.execute()

Upvotes: 8

Related Questions