Reputation: 600
I have JavaPairRDD with key of type Tuple2<Integer, Integer>
I wanted to sort the JavaPairRDD by my key so I wrote a Comparator like this:
JavaPairRDD<Tuple2<Integer, Integer>, Integer> Rresult=result.sortByKey(new Comparator<Tuple2<Integer, Integer>>() {
@Override
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
if(o1._1()==o2._1())
return o1._2()-o2._2();
return o1._1()-o2._1();
}
},true);
This sorts the values by first entry in the tuple, if they are same sort by second entry.
But I am getting the following error stack :
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
.. scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1083)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.io.ObjectStrea
Upvotes: 2
Views: 2354
Reputation: 15297
How are you creating JavaPairRDD
? Please check it before applying the sorting. Yow will also get Task not serializable exception for using new Comparator directly in sortByKey
method. You should implement Comparator
and Serializable
in a separate class and pass it to sortByKey
method. Here is the sample for your reference.
public class SparkSortSample {
public static void main(String[] args) {
//SparkSession
SparkSession spark = SparkSession
.builder()
.appName("SparkSortSample")
.master("local[1]")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
//Sample data
List<Tuple2<Tuple2<Integer, Integer>, Integer>> inputList = new ArrayList<Tuple2<Tuple2<Integer, Integer>, Integer>>();
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 444), 4444));
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(3, 333), 3333));
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(1, 111), 1111));
inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 222), 2222));
//JavaPairRDD
JavaPairRDD<Tuple2<Integer, Integer>, Integer> javaPairRdd = jsc.parallelizePairs(inputList);
//Sorted RDD
JavaPairRDD<Tuple2<Integer, Integer>, Integer> sortedPairRDD = javaPairRdd.sortByKey(new TupleComparator(), true);
sortedPairRDD.foreach(rdd -> {
System.out.println("sort = " + rdd);
});
// stop
jsc.stop();
jsc.close();
}
}
And here is TupleComparator class which implements Comparator and Serializable interfaces.
class TupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable {
@Override
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
if (o1._1() == o2._1())
return o1._2() - o2._2();
return o1._1() - o2._1();
}
}
Upvotes: 2