ashwinbhy
ashwinbhy

Reputation: 600

Sorting by key in apache spark JavaPairRDD

I have JavaPairRDD with key of type Tuple2<Integer, Integer>

I wanted to sort the JavaPairRDD by my key so I wrote a Comparator like this:

JavaPairRDD<Tuple2<Integer, Integer>, Integer> Rresult=result.sortByKey(new Comparator<Tuple2<Integer, Integer>>() {
     @Override
     public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
         if(o1._1()==o2._1())
             return o1._2()-o2._2();
         return o1._1()-o2._1();
       }
},true);

This sorts the values by first entry in the tuple, if they are same sort by second entry.

But I am getting the following error stack :

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

.. scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1083)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at java.io.ObjectStrea

Upvotes: 2

Views: 2354

Answers (1)

abaghel
abaghel

Reputation: 15297

How are you creating JavaPairRDD? Please check it before applying the sorting. Yow will also get Task not serializable exception for using new Comparator directly in sortByKey method. You should implement Comparator and Serializable in a separate class and pass it to sortByKey method. Here is the sample for your reference.

public class SparkSortSample {
public static void main(String[] args) {
    //SparkSession
    SparkSession spark = SparkSession
            .builder()
            .appName("SparkSortSample")
            .master("local[1]")
            .getOrCreate();
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
    //Sample data
    List<Tuple2<Tuple2<Integer, Integer>, Integer>> inputList = new ArrayList<Tuple2<Tuple2<Integer, Integer>, Integer>>();
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 444), 4444));
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(3, 333), 3333));
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(1, 111), 1111));
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 222), 2222));
    //JavaPairRDD
    JavaPairRDD<Tuple2<Integer, Integer>, Integer> javaPairRdd = jsc.parallelizePairs(inputList);
    //Sorted RDD
    JavaPairRDD<Tuple2<Integer, Integer>, Integer> sortedPairRDD = javaPairRdd.sortByKey(new TupleComparator(), true);
    sortedPairRDD.foreach(rdd -> {
        System.out.println("sort = " + rdd);
    });
    // stop
    jsc.stop();
    jsc.close();
   }
}

And here is TupleComparator class which implements Comparator and Serializable interfaces.

class TupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable {
@Override
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
    if (o1._1() == o2._1())
        return o1._2() - o2._2();
    return o1._1() - o2._1();
  }
}

Upvotes: 2

Related Questions