Userrrrrrrr
Userrrrrrrr

Reputation: 399

Order by value in spark pair RDD after join

I have 2 paired RDDs that I joined them together using the same key and I now I want to sort the result using one of the values. The new joined RDD type is : RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]

where the first section is the paired RDD key and the iterable part is the values from the two RDD I joined. I want now to order them by the Time field of the second RDD. I tried to use sortBy function but I got errors.

Any ideas?

Thanks

Upvotes: 1

Views: 1733

Answers (4)

Vardhaman Jain
Vardhaman Jain

Reputation: 17

Using python:

sortedRDD = unsortedRDD.sortBy(lambda x:x[1][1], False)

This will sort by descending order

Upvotes: 0

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

If the RDD's Iterable needs to be sorted:

val rdd: RDD[((String, Int), 
             Iterable[((String, DateTime, Int,Int), 
                       (String, DateTime, String, String))])] = ???

val dateOrdering = new Ordering[org.joda.time.DateTime]{ 
    override def compare(a: org.joda.time.DateTime,
                         b: org.joda.time.DateTime) = 
        if (a.isBefore(b)) -1 else 1
}

rdd.mapValues(v => v.toArray
                    .sortBy(x => x._2._2)(dateOrdering))

Upvotes: 0

Nikita
Nikita

Reputation: 4515

You're right that you can use sortBy function:

val yourRdd: RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])] = ...(your cogroup operation here)

val result = yourRdd.sortBy({
  case ((str, i), iter) if iter.nonEmpty => iter.head._2._
  }, true)

iter.head has type of ((String, DateTime, Int,Int), (String, DateTime, String, String));

iter.head._2 has type of (String, DateTime, String, String) and

iter.head._2._2 is indeed has type of DateTime.

And maybe you should provide implicit ordering object for Datetime like this. By the way, may the iterator be emtpy? Then you should add this case to sortBy function. And if there are many items in this iterator which one to choose for sorting?

Upvotes: 0

Karthik
Karthik

Reputation: 1811

Spark pair RDDs have a mapValues method. I think it will help you.

    def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
    Pass each value in the key-value pair RDD through a map function 
without changing the keys; this also retains the original RDD's partitioning.

Spark Documentation has more details.

Upvotes: 0

Related Questions