Reputation: 157
I have an array of strings tokens
that consists of words and numbers and I'm trying to compute the counts of single words, word-word pairs, and number-word pairs simultaneously in Apache Spark. I have functions isWord(token)
and isNumber(token)
that return whether a token is a word or a number.
This is an example of what a result should look like:
(states,1)
(united,10)
(alameda,3)
((united,states), 1)
((states,located), 3)
((located,alameda), 11)
((19,century),1)
((100,inhabitants),1)
((2020,music),1)
I came up with the code below to solve the problem:
val tokens = textFile.flatMap(line => findTokens(line))
// Find the word counts
val wordCounts = tokens.filter(token => isWord(token))
.map(token => (token, 1))
.reduceByKey(_ + _)
// Find the counts of all pairs
val pairCounts = tokens.sliding(2)
.map(pair => (pair(0), pair(1)))
.map(pair => (pair, 1))
.reduceByKey(_ + _)
// Find the word-word counts
val wordWordCounts = pairCounts.filter(pair => isWord(pair._1._1) && isWord(pair._1._2))
// Find the number-word counts
val numberWordCounts = pairCounts.filter(pair => isNumber(pair._1._1) && isWord(pair._1._2))
The code works well, but I'm not sure if this is the most efficient and elegant way of doing this in Spark. Does it make sense to do this in a single pass?
I'm new to Spark and Scala, but this is something that I was thinking of:
val (wordCounts, wordWordCounts, numberWordCounts) = tokens
.sliding(2)
.map({case Array(prevToken, currToken) => {
val wordCount = if (isWord(prevToken)) Seq((prevToken, 1)) else Seq.empty
val wordWordCount = if (isWord(prevToken) && isWord(currToken)) Seq(((prevToken, currToken), 1)) else Seq.empty
val numberWordCount = if (isNumber(prevToken) && isWord(currToken)) Seq(((prevToken, currToken), 1)) else Seq.empty
(wordCount, wordWordCount, numberWordCount)
}
})
val wordCountsRDD = sc.parallelize(wordCounts.reduceByKey(_ + _))
val wordWordCountsRDD = sc.parallelize(wordWordCounts.reduceByKey(_ + _))
val numberWordCountsRDD = sc.parallelize(numberWordCounts.reduceByKey(_ + _))
This code is not functional yet and I've been trying to make this work, I'm not exactly sure if this is a good practice to create multiple Sequences inside of the map function. Does it make sense to attempt something like this?
If I need to create multiple RDDs should I use transformation functions on the original RDD and compute the counts separately for each key type? Is there a better way of doing this? Thanks!
Upvotes: 0
Views: 75
Reputation: 198
it is not recommended to create multiple sequences inside the map function. Instead, you could try using flatMap to emit multiple tuples from each input token. In this way the code will be easier to understand, debug, and test. Here's an example of how it can be done:
val wordCountsRDD = tokens.flatMap(token => {
if (isWord(token)) Seq((token, 1)) else Seq.empty
}).reduceByKey(_ + _)
val wordWordCountsRDD = tokens.sliding(2).flatMap(pair => {
if (isWord(pair(0)) && isWord(pair(1))) Seq(((pair(0), pair(1)), 1)) else Seq.empty
}).reduceByKey(_ + _)
val numberWordCountsRDD = tokens.sliding(2).flatMap(pair => {
if (isNumber(pair(0)) && isWord(pair(1))) Seq(((pair(0), pair(1)), 1)) else Seq.empty
}).reduceByKey(_ + _)
Upvotes: 0