derek
derek

Reputation: 10217

spark partition.toList fails

What I want to is to group multiple elements in a partition and then do some operations on the grouped elements in each partition. But I found the conversion from partition to list failed. See the below example:

import scala.collection.mutable.ArrayBuffer
val rdd = sc.parallelize(Seq("a","b","c","d","e"), 2)
val mapped = rdd.mapPartitions( partition =>
   {
      val total = partition.size
          var first = partition.toList match
            {
             case Nil => "EMPTYLIST"
             case _ =>  partition.toList.head  
            }

  var finalResult = ArrayBuffer[String]()
  finalResult += "1:"+first;
  finalResult += "2:"+first;
  finalResult += "3:"+first;

  finalResult.iterator
})

mapped.collect()

Result:

Array[String] = Array(1:EMPTYLIST, 2:EMPTYLIST, 3:EMPTYLIST, 1:EMPTYLIST, 2:EMPTYLIST, 3:EMPTYLIST)

why partition.toList is always empty?

Upvotes: 1

Views: 373

Answers (1)

akuiper
akuiper

Reputation: 214957

partition is an iterator, the size count consumes it so at the point you are converting it to a List, it's already empty; To go through the partition more than once, you can convert the partition to a list at the beginning and then do what you need later on the List:

val mapped = rdd.mapPartitions( partition =>
    {
        val partitionList = partition.toList
        val total = partitionList.size
        val first = partitionList match
            {
              case Nil => "EMPTYLIST"
              case _ =>  partitionList.head  
            }

        var finalResult = ArrayBuffer[String]()
        finalResult += "1:"+first;
        finalResult += "2:"+first;
        finalResult += "3:"+first;

        finalResult.iterator
    })

mapped.collect
// res7: Array[String] = Array(1:a, 2:a, 3:a, 1:c, 2:c, 3:c)

Upvotes: 3

Related Questions