Reputation: 13835
I am learning spark streaming and find out some hash tags from some production logs.
In some examples, i found the following code:
val words = statuses.flatMap(line => line.split(" "))
val tags = words.filter(w => w.startsWith("#"))
val tagKeyValues = tags.map(tag => (tag, 1))
val tagCounts = tagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
The code is working fine. But i did not understand how this reduceByKeyAndWindow is working here? Why are we decrementing the values in its second argument?
Upvotes: 0
Views: 1308
Reputation: 4540
The inverse reduce function is used for optimising sliding window performance. When the window duration is 300s and the interval duration is 1s, a new reduced value can be computed from the previous reduced value by subtracting 1s of old data that falls off from the new window and adding one second of new data. There is also a version of reduceByKeyAndWindow
without the inverse function which can be used when the function is not invertible.
The algorithm implementation is quite verbosely commented and easy to understand https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala#L79
Upvotes: 4