Ben
Ben

Reputation: 966

Spark sort by key and then group by to get ordered iterable?

I have a Pair RDD (K, V) with the key containing a time and an ID. I would like to get a Pair RDD of the form (K, Iterable<V>) where the keys are groupped by id and the iterable is ordered by time.

I'm currently using sortByKey().groupByKey() and my tests seem to prove it works, however I'm reading that it may not always be the case, as discussed in this question with diverging answers ( Does groupByKey in Spark preserve the original order? ).

Is it correct or not?

Thanks!

Upvotes: 5

Views: 14479

Answers (2)

Federico
Federico

Reputation: 2133

The Spark Programming Guide offers three alternatives if one desires predictably ordered data following shuffle:

  • mapPartitions to sort each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
  • sortBy to make a globally ordered RDD

As written in the Spark API, repartitionAndSortWithinPartitions is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

The sorting, however, is computed by looking only at the keys K of tuples (K, V). The trick is to put all the relevant informations in the first element of the tuple, like ((K, V), null), defining a custom partitioner and a custom ordering. This article descrives pretty well the technique.

Upvotes: 0

maasg
maasg

Reputation: 37435

The answer from Matei, who I consider authoritative on this topic, is quite clear:

The order is not guaranteed actually, only which keys end up in each partition. Reducers may fetch data from map tasks in an arbitrary order, depending on which ones are available first. If you’d like a specific order, you should sort each partition. Here you might be getting it because each partition only ends up having one element, and collect() does return the partitions in order.

In that context, a better option would be to apply the sorting to the resulting collections per key:

rdd.groupByKey().mapValues(_.sorted)

Upvotes: 9

Related Questions