Reputation: 30815
Let us say, we have this.
val sx = sc.parallelize(Array((0, 39), (4, 47), (3, 51), (1, 98), (2, 61)))
And we later call this.
val sy = sx.sortByKey(true)
Which would make
sy = RDD[(0, 39), (1, 98), (2, 61), (3, 51), (4, 47)]
And then we do
collected = sy.map(x => (x._2 / 10, x._2)).collect
Would we always get the following. I mean, would the original key order be preserved, despite changing the key values?
collected = [(3, 39), (9, 98), (6, 61), (5, 51), (4, 47)]
Upvotes: 1
Views: 2454
Reputation: 2442
Applying the map()
transformation and calling collect()
does not change the ordering of the array elements returned by collect()
. To prove this, we simply have to show that:
The first point is pretty easy to prove. Under the hood, a call to map()
just produces a MapPartitionsRDD
by iterating through every partition and calling the function argument passed to map()
on every element within a partition. Therefore, no ordering is modified here as the the element ordering within each partition stays the same.
The second point can be proven with a closer look at collect()
. The following code is the implementation for collect()
as well as the function that collect calls.
From RDD.scala
:
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
From SparkContext.scala
:
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
The runJob()
function being called (which is an overloaded method) passes a Seq[Int]
containing the order in which partitions will be processed to another runJob()
method. This order is eventually bubbled up to the scheduler, which will determine how the action will process partitions. So with the case of collect()
, we will always process partitions in a sequential order starting with the first one.
Therefore, since neither map()
nor collect()
modifies the partition order or the ordering of elements within a partition, you will see the same ordering for the result of your collect every time. However, if you apply a transformation that requires a shuffle before your collect, all bets are off as the data will be repartitioned.
Upvotes: 8