Reputation: 742
I have an RDD and I need to apply a computation on each partition (using .mapPartition
) but only if the current partition of data have more than X elements.
Example: The number of elements in each partition of the RDD is:
80, 9, 0, 0, 0, 3, 60
I want to process only over the partitions with more than 50 elements.
Is this even possible?
Upvotes: 0
Views: 51
Reputation: 4540
Can be also done lazily without pre-calculating sizes. Filtering to partitions with at least two elements in this example
import org.apache.spark.Partitioner
object DemoPartitioner extends Partitioner {
override def numPartitions: Int = 3
override def getPartition(key: Any): Int = key match {
case num: Int => num
}
}
sc.parallelize(Seq((0, "a"), (0, "a"), (0, "a"), (1, "b"), (2, "c"), (2, "c")))
.partitionBy(DemoPartitioner) // create 3 partitions of sizes 3,1,2
.mapPartitions { it =>
val firstElements = it.take(2).toSeq
if (firstElements.size < 2) {
Iterator.empty
} else {
firstElements.iterator ++ it
}
}.foreach(println)
Output:
(2,c)
(2,c)
(0,a)
(0,a)
(0,a)
So partition 1 with just a single element was skipped
Upvotes: 1