qed
qed

Reputation: 23114

groupBy cannot handle large RDDs

Here is the code:

val words = sc.textFile("/Users/kaiyin/IdeaProjects/learnSpark/src/main/resources/eng_words.txt" )
words.take(1000000).foreach(println _)
words.take(150000).groupBy((x: String) => x.head).map {
  case (c, iter)  => (c, iter.toList.size)
}.foreach {
  println _
}

The eng_words.txt is a text file containing about 1 million English words, one per line. Once the RDD goes above 150000, groupBy will crash with this error:

java.util.NoSuchElementException: next on empty iterator
  at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
  at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
  at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
  at scala.collection.IterableLike$class.head(IterableLike.scala:107)
  at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:30)
  at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
  at scala.collection.immutable.StringOps.head(StringOps.scala:30)
  at $anon$1$$anonfun$run$1.apply(<console>:23)
  at $anon$1$$anonfun$run$1.apply(<console>:23)
  at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:332)
  at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:331)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:331)
  at scala.collection.mutable.ArrayOps$ofRef.groupBy(ArrayOps.scala:186)
  at $anon$1.run(<console>:23)
  at Helper.HasRun$class.newRun(HasRun.scala:21)
  at $anon$1.newRun(<console>:19)
  ... 55 elided

What went wrong?

Upvotes: 0

Views: 257

Answers (1)

zero323
zero323

Reputation: 330183

In this particular case it most likely cannot handle an empty string. Nevertheless don't groupBy, don't call toList and don't trust blindly that the input is well formated.

  • head will fail on empty line with the error you see

  • groupBy same as groupByKey requires all records for each key to fit into executor memory.

What you have here is yet another word count:

words
  // Make sure that it won't fail on empty string with
  // java.util.NoSuchElementException: next on empty iterator
  .flatMap(_.headOption) 
  // Map to pairs and reduce to avoid excessive shuffling and limit memory usage
  .map((_, 1))
  .reduceByKey(_ + _)

Upvotes: 4

Related Questions