Reputation: 10217
What I want to is to group multiple elements in a partition and then do some operations on the grouped elements in each partition. But I found the conversion from partition to list failed. See the below example:
import scala.collection.mutable.ArrayBuffer
val rdd = sc.parallelize(Seq("a","b","c","d","e"), 2)
val mapped = rdd.mapPartitions( partition =>
{
val total = partition.size
var first = partition.toList match
{
case Nil => "EMPTYLIST"
case _ => partition.toList.head
}
var finalResult = ArrayBuffer[String]()
finalResult += "1:"+first;
finalResult += "2:"+first;
finalResult += "3:"+first;
finalResult.iterator
})
mapped.collect()
Result:
Array[String] = Array(1:EMPTYLIST, 2:EMPTYLIST, 3:EMPTYLIST, 1:EMPTYLIST, 2:EMPTYLIST, 3:EMPTYLIST)
why partition.toList is always empty?
Upvotes: 1
Views: 373
Reputation: 214957
partition is an iterator, the size count consumes it so at the point you are converting it to a List, it's already empty; To go through the partition more than once, you can convert the partition to a list at the beginning and then do what you need later on the List:
val mapped = rdd.mapPartitions( partition =>
{
val partitionList = partition.toList
val total = partitionList.size
val first = partitionList match
{
case Nil => "EMPTYLIST"
case _ => partitionList.head
}
var finalResult = ArrayBuffer[String]()
finalResult += "1:"+first;
finalResult += "2:"+first;
finalResult += "3:"+first;
finalResult.iterator
})
mapped.collect
// res7: Array[String] = Array(1:a, 2:a, 3:a, 1:c, 2:c, 3:c)
Upvotes: 3