Pop
Pop

Reputation: 12401

"Un-flatten" an RDD in spark

I have an RDD[LabeledPoint] of size N.

I would like to transform it to a RDD[Array[LabeledPoint]] in a way that all Arrays have roughly the same size (except one smaller if needed).

I have found here a method (for an RDD[Double]) iterating over RRD's partitions:

val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] =>
  new Iterator[Array[Int]] {
    def hasNext: Boolean = iter.hasNext
    def next(): Array[Int] = {
      iter.take(batchedDegree).toArray
    }
  }
}

However, in practice, as this method is partitionwise, it creates a lot of Arrays with sizes (much) smaller than the desired one.

I have considered using coalesce to reduce the number of partitions and hence the number of smaller Arrays. But this may reduce speed in the latter part of my job.

Do you have other ideas to transform the RDD in a better manner?

Upvotes: 1

Views: 542

Answers (1)

Avihoo Mamka
Avihoo Mamka

Reputation: 4786

You can use rdd.glom().

From the Scala docs:

/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/

 def glom(): RDD[Array[T]] = withScope
 {
     new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))   
 }

Upvotes: 2

Related Questions