Reputation: 12401
I have an RDD[LabeledPoint]
of size N.
I would like to transform it to a RDD[Array[LabeledPoint]]
in a way that all Arrays have roughly the same size (except one smaller if needed).
I have found here a method (for an RDD[Double]
) iterating over RRD's partitions:
val batchedRDD = rdd.mapPartitions { iter: Iterator[Int] =>
new Iterator[Array[Int]] {
def hasNext: Boolean = iter.hasNext
def next(): Array[Int] = {
iter.take(batchedDegree).toArray
}
}
}
However, in practice, as this method is partitionwise, it creates a lot of Arrays with sizes (much) smaller than the desired one.
I have considered using coalesce
to reduce the number of partitions and hence the number of smaller Arrays. But this may reduce speed in the latter part of my job.
Do you have other ideas to transform the RDD
in a better manner?
Upvotes: 1
Views: 542
Reputation: 4786
You can use rdd.glom()
.
From the Scala docs:
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/def glom(): RDD[Array[T]] = withScope { new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) }
Upvotes: 2