Reputation: 43
In the following code(written in scala), I print partition.size twice but got two different results.
Code:
val a = Array(1,2,3,4,5,6,7,8,9)
val rdd = sc.parallelize(a)
rdd.foreachPartition {
partition =>
println("1. partition.size: " + partition.size) //1. partition.size: 2
println("2. partition.size: " + partition.size) //2. partition.size: 0
}
Results:
1. partition.size: 2
2. partition.size: 0
According to the Spark API doc, partition is a type of Iterator
def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
Applies a function f to each partition of this RDD.
So why the size of partition is set to 0 ?
Upvotes: 1
Views: 1165
Reputation: 5173
After digging i found the answer, Its not an issue with either the Spark or your program. It basically works as designed. I mean the foreachpartition method closure takes the Iterator[T] as a input which is a scala iterator. When size() method is invoked on the scala iterator, it computes the size and moves to the end of the list. look at the documentation from Scala Iterator,
http://www.scala-lang.org/docu/files/collections-api/collections_43.html
The number of elements returned by it. Note: it will be at its end after this operation!
When you ask Iterator to give you the size it will traverse each element in the sequence it points to, moving each time one position to the right. When it has no more elements to traverse iterator.hasNext == false it will return the size. But by then it will have exhausted all the elements. And hence when the size is retrieved for the second time it returns 0
Upvotes: 9