Reputation: 1175
I'm usinig spark with java. And i want to sort my map. In fact, i have i javaRDD like this :
JavaPairRDD<String, Integer> rebondCountURL = session_rebond_2.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
return new Tuple2<String, String>(stringStringTuple2._2, stringStringTuple2._1);
}
}).groupByKey().map(new PairFunction<Tuple2<String, Iterable<String>>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
Iterable<String> strings = stringIterableTuple2._2;
List<String> b = new ArrayList<String>();
for (String s : strings) {
b.add(s);
}
return new Tuple2<String, Integer>(stringIterableTuple2._1, b.size());
}
});
And i want to sort this Java Rdd using Sortby (in order to sort using the Integer).
Can you help me please to do it ?
Thank you in advance.
Upvotes: 4
Views: 8836
Reputation: 2113
This is a code based on @Vignesh suggestion. You can sortBy
any custom implementation of Comparator
. It is more clean to write the comparator separately, and use a reference in the spark code :
rdd ->{JavaRDD<MaxProfitDto> result =
rdd.keyBy(Recommendations.profitAsKey)
.sortByKey(new CryptoVolumeComparator())
.values()
So, the comparator looks like below:
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Comparator;
import models.CryptoDto;
import scala.Tuple2;
public class CryptoVolumeComparator implements Comparator<Tuple2<BigDecimal, CryptoDto>>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(Tuple2<BigDecimal, CryptoDto> v1, Tuple2<BigDecimal, CryptoDto> v2) {
return v2._1().compareTo(v1._1());
}
}
Upvotes: 0
Reputation: 16650
You need to create a function which extracts the sorting key from each element. Example from our code
final JavaRDD<Something> stage2 = stage1.sortBy( new Function<Something, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call( Something value ) throws Exception {
return value.getTime();
}
}, true, 1 );
Upvotes: 14
Reputation: 300
Just a tip related to sortBy().. If you want to sort a set of user defined objects say Point then implement the Comparable<Point> interface in the class Point and override the compareTo() method in which you can write your own logic for sorting. After this, the sortby function will take care of the sorting logic.
Note: your Point class must also implement java.io.Serializable interface or else you will encounter NotSerializable exception.
Upvotes: 0