fo_x86
fo_x86

Reputation: 2613

Strict partition an RDD into multiple RDDs in Spark

I have an rdd with n partitions and I would like to split this rdd into k rdds in such a way that

rdd = rdd_1.union(rdd_2).union(rdd_3)...union(rdd_k)

So for example if n=10 and k=2 I would like to end up with 2 rdds where rdd1 is composed of 5 partitions and rdd2 is composed of the other 5 partitions.

What is the most efficient way to do this in Spark?

Upvotes: 0

Views: 377

Answers (1)

zero323
zero323

Reputation: 330093

You can try something like this:

val rdd: RDD[T] = ???
val k: Integer = ???
val n = rdd.partitions.size

val rdds = (0 until n) // Create Seq of partitions numbers
  .grouped(n / k)  // group it into fixed sized buckets
  .map(idxs => (idxs.head, idxs.last)) // Take the first and the last idx
  .map {
    case(min, max) => rdd.mapPartitionsWithIndex(
      // If partition in [min, max] range keep its iterator
      // otherwise return empty-one
      (i, iter) => if (i >= min & i <= max) iter else Iterator()
    )
  }

If input RDD has complex dependencies you should cache it before applying this.

Upvotes: 1

Related Questions