VRK
VRK

Reputation: 13

Average word length in Spark

I have a list of values and their aggregated lengths of all their occurrences as an array.

Ex: If my sentence is

"I have a cat. The cat looks very cute"

My array looks like

Array((I,1), (have,4), (a,1), (cat,6), (The, 3), (looks, 5), (very ,4), (cute,4))

Now I want to compute the average length of each word. i.e the length / number of occurrences.

I tried to do the coding using Scala as follows:

val avglen = arr.reduceByKey( (x,y) => (x, y.toDouble / x.size.toDouble) )

I'm getting an error as follows at x.size

error: value size is not a member of Int

Please help me where I'm going wrong here.

Upvotes: 1

Views: 4835

Answers (3)

Alberto Bonsanto
Alberto Bonsanto

Reputation: 18042

After your comment I think I got it:

val words = sc.parallelize(Array(("i", 1), ("have", 4), 
                                 ("a", 1), ("cat", 6), 
                                 ("the", 3), ("looks", 5), 
                                 ("very", 4), ("cute", 4)))

val avgs = words.map { case (word, count) => (word, count / word.length.toDouble) }

println("My averages are: ")
avgs.take(100).foreach(println)

enter image description here

Supposing you have a paragraph with those words and You want to calculate the mean size of the words of the paragraph.

In two steps, with a map-reduce approach and in spark-1.5.1:

val words = sc.parallelize(Array(("i", 1), ("have", 4), 
                                 ("a", 1), ("cat", 6), 
                                 ("the", 3), ("looks", 5), 
                                 ("very", 4), ("cute", 4)))

val wordCount = words.map { case (word, count) => count}.reduce((a, b) => a + b)
val wordLength = words.map { case (word, count) => word.length * count}.reduce((a, b) => a + b)

println("The avg length is: " +  wordLength / wordCount.toDouble)

I ran this code using an .ipynb connected to a spark-kernel this is the output.

enter image description here

Upvotes: 0

Rohan Aletty
Rohan Aletty

Reputation: 2442

This is a slightly confusing question. If your data is already in an Array[(String, Int)] collection (presumably after a collect() to the driver), then you need not use any RDD transformations. In fact, there's a nifty trick you can run with fold*() to grab the average over a collection:

val average = arr.foldLeft(0.0) { case (sum: Double, (_, count: Int)) => sum + count } / arr.foldLeft(0.0) { case (sum: Double, (word: String, count: Int)) => sum + count / word.length }

Kind of long winded, but it essentially aggregates the total number of characters in the numerator and the number of words in the denominator. Run on your example, I see the following:

scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))

scala> val average = ...
average: Double = 3.111111111111111

If you have your (String, Int) tuples distributed across an RDD[(String, Int)], you can use accumulators to solve this problem quite easily:

val chars = sc.accumulator(0.0)
val words = sc.accumulator(0.0)
wordsRDD.foreach { case (word: String, count: Int) =>
  chars += count; words += count / word.length
}

val average = chars.value / words.value

When running on the above example (placed in an RDD), I see the following:

scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4))
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4))

scala> val wordsRDD = sc.parallelize(arr)
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:14

scala> val chars = sc.accumulator(0.0)
chars: org.apache.spark.Accumulator[Double] = 0.0

scala> val words = sc.accumulator(0.0)
words: org.apache.spark.Accumulator[Double] = 0.0

scala> wordsRDD.foreach { case (word: String, count: Int) =>
     |   chars += count; words += count / word.length
     | }
...
scala>     val average = chars.value / words.value
average: Double = 3.111111111111111

Upvotes: 0

Eugene Zhulenev
Eugene Zhulenev

Reputation: 9734

If I understand the problem correctly:

val rdd: RDD[(String, Int) = ???
val ave: RDD[(String, Double) = 
     rdd.map { case (name, numOccurance) => 
       (name, name.length.toDouble / numOccurance)
     }

Upvotes: 0

Related Questions