Funzo
Funzo

Reputation: 1290

Spark: Is preservesPartitioning in mapPartitions ignored unless we are using a key-value pair RDD

Is preservesPartitioning in mapPartitions ignored unless we are using a key-value pair RDD. If not then would setting this to true on a non key value pair RDD cause a shuffle?

Upvotes: 0

Views: 2709

Answers (3)

yann
yann

Reputation: 1

you can see the role of preservesPartitioning in org.apache.spark.rdd.MapPartitionsRDD, and it is very clear

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

Upvotes: 0

Ged
Ged

Reputation: 18098

For a non-K,V tuple one cannot assign a Partitioner, you simply get "default" partitioning --> as is.

You can call mapPartitions with true or false for preservePartitions, but it does not have any effect.

If you consider default partitioning, then same partitioning after mapPartitions still must apply as you can observe below, so in that sense partitioning is preserved, but in a different way.

That is to say, shuffling is avoided or rather, is not possible, as there is no key to consider, i.e. there can never be a wide-transformation as a result.

Parallel processing as is the norm applies.

import org.apache.spark.HashPartitioner // pointless here
def myfunc(iter: Iterator[Int]) : Iterator[String] = {
  iter.map{case(x) => ("A" + x)} 
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,1,4,1,4,5,6), 3)//.toDebugString // NOT POSSIBLE.partitionBy(new HashPartitioner(7000))
val rNP1 = rdd1.partitions.size
val isD1 = rdd1.partitioner.isDefined
//val typ1 = rdd1.partitioner.get

val rdd2 = rdd1.mapPartitions(myfunc, true) // or false
val rNP2 = rdd2.partitions.size
val isD2 = rdd2.partitioner.isDefined

// Show data and distribution
val mapped =   rdd2.mapPartitionsWithIndex{
                      (index, iterator) => {
                                    println("Called in Partition -> " + index)
                                    val myList = iterator.toList
                                    myList.map(x => x + " -> " + index).iterator
                                           }
               }
mapped.collect()

//false rdd1 & rdd2
//res11: Array[String] = Array(1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0, 6 -> 1, 7 -> 1, 8 -> 1, 9 -> 1, 10 -> 1, 1 -> 2, 4 -> 2, 1 -> 2, 4 -> 2, 5 -> 2, 6 -> 2)
//res19: Array[String] = Array(A1 -> 0, A2 -> 0, A3 -> 0, A4 -> 0, A5 -> 0, A6 -> 1, A7 -> 1, A8 -> 1, A9 -> 1, A10 -> 1, A1 -> 2, A4 -> 2, A1 -> 2, A4 -> 2, A5 -> 2, A6 -> 2)
//true, rdd2
//res20: Array[String] = Array(A1 -> 0, A2 -> 0, A3 -> 0, A4 -> 0, A5 -> 0, A6 -> 1, A7 -> 1, A8 -> 1, A9 -> 1, A10 -> 1, A1 -> 2, A4 -> 2, A1 -> 2, A4 -> 2, A5 -> 2, A6 -> 2)

Upvotes: 2

Ajay Kharade
Ajay Kharade

Reputation: 1525

As per spark documentation, preservesPartitioning in mapPartitions will not work if you are working on Seq(i.e. list elements and not key value pair) in spark, and will work if there is map or schema RDD i.e. key-value pair data set. And while working on non key value pair if parameter set to true still to make it work in parallel manner need to use parallelize() method which shuffle the collection and compute in parallel way.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.

Please refer this link.

Upvotes: 1

Related Questions