pythonic
pythonic

Reputation: 21675

Secondary sorting by using join in Spark?

In Spark, I want to sort an RDD by two different fields. For example, in the given example here, I want to sort the elements by fieldA first, and within that, sort by fieldB (Secondary sorting). Is the method employed in the given example good enough? I have tested my code and it works. But is this a reliable way of doing it?

// x is of type (key, fieldA) and y of type (key, fieldB) 
val a = x.sortBy(_._2)
// b will be of type (key, (fieldB, fieldA))
val b = y.join(x).sortBy(_._2._1))

So, I want an output that looks like the following, for example.

fieldA, fieldB
2, 10 
2, 11
2, 13
7, 5
7, 7
7, 8
9, 3
9, 10
9, 10

Upvotes: 1

Views: 434

Answers (2)

zero323
zero323

Reputation: 330453

But is this a reliable way of doing it?

It is not reliable. It depends on an assumption that during the shuffle data is processed in the order defined by the order of partitions. This may happen but there is no guarantee it will.

In other words shuffle based sorting is not stable. In general there exist methods which can be used to achieve desired result without performing full shuffle twice, but these are quite low level and for optimal performance require custom Partitioner.

Upvotes: 2

Nagarjuna Pamu
Nagarjuna Pamu

Reputation: 14825

You can use sortBy in the following way

y.join(x).sortBy(r => (r._2._2, r._2._1))

Two sorting will happen in one go.

Upvotes: 2

Related Questions