Rhyzx
Rhyzx

Reputation: 45

Sort JavaRDD Tuple by multiple values in Apache Spark (Java)

I'm quite new to spark and need to solve this with Java and just by use of the RDD API.

I have a JavaRDD Tuple with four values:

    JavaRDD<Tuple4 <Integer, Double, Long, Integer>> revenue = ...;

I want to sort the tuple by the Double value in descending order.

If two Doubles have the same value, I want to sort them by the Long value in ascending order.

So for example:

    (7, 4.3, 5, 9)
    (1, 5.1, 7, 10)
    (8, 1.2, 4, 7)
    (1, 4.3, 4, 2)
    (3, 4.3, 8, 5)

is sorted to:

    (1, 5.1, 7, 10)
    (1, 4.3, 4, 2)
    (7, 4.3, 5, 9)
    (3, 4.3, 8, 5)
    (8, 1.2, 4, 7)

So far I've tried to use a custom Comparator like this:

    class TupleComparator implements Comparator<Tuple4<Integer, Double, Long, Integer>>, Serializable {

        private static final long serialVersionUID = 1L;

        @Override
        public int compare(Tuple4<Integer, Double, Long, Integer> v1,
        Tuple4<Integer, Double, Long, Integer> v2) {

        if(v1._2().compareTo(v2._2()) == 0){
            return v1._3().compareTo(v2._3());
        }
            return - v1._2().compareTo(v2._2());
        }
    }

But in order to use a custom comparator I need to use the sortByKey function.

So I'd need to make a key. (Since the normal sortBy function does not take a comparator).

If i then try to apply my comparator like this:

   revenue.keyBy(x -> x._2()).groupByKey().sortByKey(new TupleComparator(), false, 1);

I get: "The method sortByKey ... is not applicable for the arguments (TupleComparator, boolean, int)"

This is where I'm stuck. I'm not sure if what I'm doing is right or how to make my comparator work. (I'm not too familiar with custom Comparators).

Maybe there's a better way to achieve this? I know it's more simple to do in Scala.

However I need to do it in Java and just by use of the RDD API.

Upvotes: 2

Views: 2397

Answers (1)

abaghel
abaghel

Reputation: 15317

You need to do it in two steps. First you would need to sort the RDD in descending order of Double values.

JavaRDD<Tuple4<Integer, Double, Long, Integer>> firstSortRDD = revenue.sortBy( new Function<Tuple4<Integer, Double, Long, Integer>, Integer>() {
        @Override
        public Integer call(Tuple4<Integer, Double, Long, Integer> value) throws Exception {
            return value._2().intValue();
        }
    }, false, 1 );

Next sorting you required is in ascending order by Long values which is dependent on the order of Double values. So you have to create a Key Tuple2<Double,Long>. Now you can use sortByKey method and pass your Comparator having custom logic for comparison.

JavaRDD<Tuple4<Integer,Double,Long,Integer>> secondSortRDD = firstSortRDD.keyBy(new Function<Tuple4<Integer, Double, Long, Integer>, Tuple2<Double, Long>>(){
        @Override
        public Tuple2<Double, Long> call(Tuple4<Integer, Double, Long, Integer> value) throws Exception {
            return new Tuple2(value._2(),value._3());
        }}).sortByKey(new TupleComparator()).values();

And here is your Comparator class for Tuple2<Double,Long>

class TupleComparator implements Comparator<Tuple2<Double,Long>>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(Tuple2<Double, Long> v1, Tuple2<Double, Long> v2) {
    if (v1._1().compareTo(v2._1()) == 0) {
        return v1._2().compareTo(v2._2());
    }
        return  v2._2().compareTo(v1._2());
    }
}

Upvotes: 2

Related Questions