Reputation: 1290
Is preservesPartitioning in mapPartitions ignored unless we are using a key-value pair RDD. If not then would setting this to true on a non key value pair RDD cause a shuffle?
Upvotes: 0
Views: 2709
Reputation: 1
you can see the role of preservesPartitioning in org.apache.spark.rdd.MapPartitionsRDD, and it is very clear
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
Upvotes: 0
Reputation: 18098
For a non-K,V tuple one cannot assign a Partitioner, you simply get "default" partitioning --> as is.
You can call mapPartitions with true or false for preservePartitions, but it does not have any effect.
If you consider default partitioning, then same partitioning after mapPartitions still must apply as you can observe below, so in that sense partitioning is preserved, but in a different way.
That is to say, shuffling is avoided or rather, is not possible, as there is no key to consider, i.e. there can never be a wide-transformation as a result.
Parallel processing as is the norm applies.
import org.apache.spark.HashPartitioner // pointless here
def myfunc(iter: Iterator[Int]) : Iterator[String] = {
iter.map{case(x) => ("A" + x)}
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,1,4,1,4,5,6), 3)//.toDebugString // NOT POSSIBLE.partitionBy(new HashPartitioner(7000))
val rNP1 = rdd1.partitions.size
val isD1 = rdd1.partitioner.isDefined
//val typ1 = rdd1.partitioner.get
val rdd2 = rdd1.mapPartitions(myfunc, true) // or false
val rNP2 = rdd2.partitions.size
val isD2 = rdd2.partitioner.isDefined
// Show data and distribution
val mapped = rdd2.mapPartitionsWithIndex{
(index, iterator) => {
println("Called in Partition -> " + index)
val myList = iterator.toList
myList.map(x => x + " -> " + index).iterator
}
}
mapped.collect()
//false rdd1 & rdd2
//res11: Array[String] = Array(1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0, 6 -> 1, 7 -> 1, 8 -> 1, 9 -> 1, 10 -> 1, 1 -> 2, 4 -> 2, 1 -> 2, 4 -> 2, 5 -> 2, 6 -> 2)
//res19: Array[String] = Array(A1 -> 0, A2 -> 0, A3 -> 0, A4 -> 0, A5 -> 0, A6 -> 1, A7 -> 1, A8 -> 1, A9 -> 1, A10 -> 1, A1 -> 2, A4 -> 2, A1 -> 2, A4 -> 2, A5 -> 2, A6 -> 2)
//true, rdd2
//res20: Array[String] = Array(A1 -> 0, A2 -> 0, A3 -> 0, A4 -> 0, A5 -> 0, A6 -> 1, A7 -> 1, A8 -> 1, A9 -> 1, A10 -> 1, A1 -> 2, A4 -> 2, A1 -> 2, A4 -> 2, A5 -> 2, A6 -> 2)
Upvotes: 2
Reputation: 1525
As per spark documentation, preservesPartitioning in mapPartitions will not work if you are working on Seq(i.e. list elements and not key value pair) in spark, and will work if there is map or schema RDD i.e. key-value pair data set. And while working on non key value pair if parameter set to true still to make it work in parallel manner need to use parallelize() method which shuffle the collection and compute in parallel way.
preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.
Please refer this link.
Upvotes: 1