MetallicPriest
MetallicPriest

Reputation: 30815

Would Spark preserve key order with this sortByKey/map/collect sequence?

Let us say, we have this.

val sx = sc.parallelize(Array((0, 39), (4, 47), (3, 51), (1, 98), (2, 61)))

And we later call this.

val sy = sx.sortByKey(true)

Which would make

sy = RDD[(0, 39), (1, 98), (2, 61), (3, 51), (4, 47)] 

And then we do

collected = sy.map(x => (x._2 / 10, x._2)).collect

Would we always get the following. I mean, would the original key order be preserved, despite changing the key values?

collected = [(3, 39), (9, 98), (6, 61), (5, 51), (4, 47)]

Upvotes: 1

Views: 2454

Answers (1)

Rohan Aletty
Rohan Aletty

Reputation: 2442

Applying the map() transformation and calling collect() does not change the ordering of the array elements returned by collect(). To prove this, we simply have to show that:

  • map will not modify the ordering of elements in an RDD
  • collect will always return elements of an RDD in the same array ordering on every call

The first point is pretty easy to prove. Under the hood, a call to map() just produces a MapPartitionsRDD by iterating through every partition and calling the function argument passed to map() on every element within a partition. Therefore, no ordering is modified here as the the element ordering within each partition stays the same.

The second point can be proven with a closer look at collect(). The following code is the implementation for collect() as well as the function that collect calls.

From RDD.scala:

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

From SparkContext.scala:

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
  runJob(rdd, func, 0 until rdd.partitions.length)
}

The runJob() function being called (which is an overloaded method) passes a Seq[Int] containing the order in which partitions will be processed to another runJob() method. This order is eventually bubbled up to the scheduler, which will determine how the action will process partitions. So with the case of collect(), we will always process partitions in a sequential order starting with the first one.

Therefore, since neither map() nor collect() modifies the partition order or the ordering of elements within a partition, you will see the same ordering for the result of your collect every time. However, if you apply a transformation that requires a shuffle before your collect, all bets are off as the data will be repartitioned.

Upvotes: 8

Related Questions