Subramanyam S
Subramanyam S

Reputation: 59

How to achieve sort by value in spark java

JavaPairRDD<String, Float> counts = ones
            .reduceByKey(new Function2<Float, Float, Float>() {
                @Override
                public Float call(Float i1, Float i2) {
                    return i1 + i2;
                }
            });

My output looks like this:

id,value
100002,23.47
100003,42.78
200003,50.45
190001,30.23

I would like the output to be sorted by value like:

200003,50.45
100003,42.78
190001,30.23
100002,23.47

How do I achieve this?

Upvotes: 5

Views: 12518

Answers (2)

Daniel Langdon
Daniel Langdon

Reputation: 5999

Scala has a nice sortBy method. Could not find the Java equivalent, but this is the scala implementation:

  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.size)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values

So, basically similar to the above, but it add a key instead of swapping forward and backwards. I use it like this: .sortBy(_._2) (sort by picking the second element of the tuple).

Upvotes: 4

user1261215
user1261215

Reputation:

I think there is no specific API to sort the data on value.

May be you need to do below steps:

1) Swap key and value
2) Use sortByKey API
3) Swap key and value

Look at the more details about sortByKey in beloe reference:
https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/api/java/JavaPairRDD.html#sortByKey%28boolean%29

for swap, we can use Scala Tuple API:

http://www.scala-lang.org/api/current/index.html#scala.Tuple2

For example, I have Java Pair RDD from the below function.

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
          @Override
          public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
          }
  });

Now, To swap key and value, you can use below code:

JavaPairRDD<Integer, String> swappedPair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
           public Tuple2<Integer, String> call(Tuple2<String, Integer> item) throws Exception {
               return item.swap();
           }

        });

Hope this helps. You need to take care of the data types.

Upvotes: 3

Related Questions