lowlypalace
lowlypalace

Reputation: 157

How to compute word counts and pair counts in a single scan in Spark

I have an array of strings tokens that consists of words and numbers and I'm trying to compute the counts of single words, word-word pairs, and number-word pairs simultaneously in Apache Spark. I have functions isWord(token) and isNumber(token) that return whether a token is a word or a number.

This is an example of what a result should look like:

(states,1)
(united,10)
(alameda,3)
((united,states), 1)
((states,located), 3)
((located,alameda), 11)
((19,century),1)
((100,inhabitants),1)
((2020,music),1)

I came up with the code below to solve the problem:


    val tokens = textFile.flatMap(line => findTokens(line))

    // Find the word counts
    val wordCounts = tokens.filter(token => isWord(token))
                           .map(token => (token, 1))
                           .reduceByKey(_ + _)

    // Find the counts of all pairs
    val pairCounts = tokens.sliding(2)
                           .map(pair => (pair(0), pair(1)))
                           .map(pair => (pair, 1))
                           .reduceByKey(_ + _)

    // Find the word-word counts
    val wordWordCounts = pairCounts.filter(pair => isWord(pair._1._1) && isWord(pair._1._2))

    // Find the number-word counts
    val numberWordCounts = pairCounts.filter(pair => isNumber(pair._1._1) && isWord(pair._1._2))

The code works well, but I'm not sure if this is the most efficient and elegant way of doing this in Spark. Does it make sense to do this in a single pass?

I'm new to Spark and Scala, but this is something that I was thinking of:

    val (wordCounts, wordWordCounts, numberWordCounts) = tokens
      .sliding(2)
      .map({case Array(prevToken, currToken) => {

        val wordCount = if (isWord(prevToken)) Seq((prevToken, 1)) else Seq.empty
        val wordWordCount = if (isWord(prevToken) && isWord(currToken)) Seq(((prevToken, currToken), 1)) else Seq.empty
        val numberWordCount = if (isNumber(prevToken) && isWord(currToken)) Seq(((prevToken, currToken), 1)) else Seq.empty

        (wordCount, wordWordCount, numberWordCount)
        }
      })

    val wordCountsRDD = sc.parallelize(wordCounts.reduceByKey(_ + _))
    val wordWordCountsRDD = sc.parallelize(wordWordCounts.reduceByKey(_ + _))
    val numberWordCountsRDD = sc.parallelize(numberWordCounts.reduceByKey(_ + _))

This code is not functional yet and I've been trying to make this work, I'm not exactly sure if this is a good practice to create multiple Sequences inside of the map function. Does it make sense to attempt something like this?

If I need to create multiple RDDs should I use transformation functions on the original RDD and compute the counts separately for each key type? Is there a better way of doing this? Thanks!

Upvotes: 0

Views: 75

Answers (1)

afjcjsbx
afjcjsbx

Reputation: 198

it is not recommended to create multiple sequences inside the map function. Instead, you could try using flatMap to emit multiple tuples from each input token. In this way the code will be easier to understand, debug, and test. Here's an example of how it can be done:

val wordCountsRDD = tokens.flatMap(token => {
  if (isWord(token)) Seq((token, 1)) else Seq.empty
}).reduceByKey(_ + _)

val wordWordCountsRDD = tokens.sliding(2).flatMap(pair => {
  if (isWord(pair(0)) && isWord(pair(1))) Seq(((pair(0), pair(1)), 1)) else Seq.empty
}).reduceByKey(_ + _)

val numberWordCountsRDD = tokens.sliding(2).flatMap(pair => {
  if (isNumber(pair(0)) && isWord(pair(1))) Seq(((pair(0), pair(1)), 1)) else Seq.empty
}).reduceByKey(_ + _)

Upvotes: 0

Related Questions