Reputation: 2613
I have an rdd with n
partitions and I would like to split this rdd into k
rdds in such a way that
rdd = rdd_1.union(rdd_2).union(rdd_3)...union(rdd_k)
So for example if n=10
and k=2
I would like to end up with 2 rdds where rdd1 is composed of 5 partitions and rdd2 is composed of the other 5 partitions.
What is the most efficient way to do this in Spark?
Upvotes: 0
Views: 377
Reputation: 330093
You can try something like this:
val rdd: RDD[T] = ???
val k: Integer = ???
val n = rdd.partitions.size
val rdds = (0 until n) // Create Seq of partitions numbers
.grouped(n / k) // group it into fixed sized buckets
.map(idxs => (idxs.head, idxs.last)) // Take the first and the last idx
.map {
case(min, max) => rdd.mapPartitionsWithIndex(
// If partition in [min, max] range keep its iterator
// otherwise return empty-one
(i, iter) => if (i >= min & i <= max) iter else Iterator()
)
}
If input RDD
has complex dependencies you should cache it before applying this.
Upvotes: 1