Reputation: 399
I have 2 paired RDDs that I joined them together using the same key and I now I want to sort the result using one of the values. The new joined RDD type is : RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]
where the first section is the paired RDD key and the iterable part is the values from the two RDD I joined. I want now to order them by the Time field of the second RDD. I tried to use sortBy function but I got errors.
Any ideas?
Thanks
Upvotes: 1
Views: 1733
Reputation: 17
Using python:
sortedRDD = unsortedRDD.sortBy(lambda x:x[1][1], False)
This will sort by descending order
Upvotes: 0
Reputation: 8851
If the RDD's Iterable needs to be sorted:
val rdd: RDD[((String, Int),
Iterable[((String, DateTime, Int,Int),
(String, DateTime, String, String))])] = ???
val dateOrdering = new Ordering[org.joda.time.DateTime]{
override def compare(a: org.joda.time.DateTime,
b: org.joda.time.DateTime) =
if (a.isBefore(b)) -1 else 1
}
rdd.mapValues(v => v.toArray
.sortBy(x => x._2._2)(dateOrdering))
Upvotes: 0
Reputation: 4515
You're right that you can use sortBy
function:
val yourRdd: RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])] = ...(your cogroup operation here)
val result = yourRdd.sortBy({
case ((str, i), iter) if iter.nonEmpty => iter.head._2._
}, true)
iter.head
has type of ((String, DateTime, Int,Int), (String, DateTime, String, String))
;
iter.head._2
has type of (String, DateTime, String, String)
and
iter.head._2._2
is indeed has type of DateTime
.
And maybe you should provide implicit ordering object for Datetime like this.
By the way, may the iterator be emtpy? Then you should add this case to sortBy
function. And if there are many items in this iterator which one to choose for sorting?
Upvotes: 0
Reputation: 1811
Spark pair RDDs have a mapValues method. I think it will help you.
def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
Pass each value in the key-value pair RDD through a map function
without changing the keys; this also retains the original RDD's partitioning.
Spark Documentation has more details.
Upvotes: 0