Reputation: 53
I have a RDD in which my key is an id and values include list of ids. I want to sort the list of values in ascending order For Example
1, list(12,3,8,10)
2, list(42,3,65,33)
3, list(6,2,4,1)
Output
1, list(3,8,10,12)
2, list(3,33,42,65)
3, list(1,2,4,6)
RDD Creation
So I have created RDD after joining two different RDDs and then used it productIterator
to create list of values
which is giving me RDD of type RDD(Int, List[Any])
I tried
rdd.mapValues(x=> _.2.sorted)
different sort methods but no luck
Upvotes: 0
Views: 1369
Reputation: 385
You're nearly there.
mapValues
, as it's name suggests, applies your mapping function only to the values. Your code looks like you're trying to extract the second element from a key/value tuple, which would, I imagine be throwing errors.
You can either use map
or mapValues
. map
needs to return a tuple if you wish to retain your key, so mapValues
is simpler, but I'll show you both ways. So we're starting with an RDD[(Int, List[Int])]
which I've already constructed, and using collect()
to view it.
scala> start
res17: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[6] at map at <console>:37
scala> start.collect()
res18: Array[(Int, List[Int])] = Array((1,List(12, 3, 8, 10)), (2,List(42, 3, 65, 33)))
Firstly, let's do the simplest thing:
scala> start.mapValues(x => x.sorted).collect()
res19: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))
As you can see, it returns the ordering you expect.
Using map to modify the key/value tuple is pretty simple, so long as you preserve your key. I recommend using Scala's case function syntax to destructure the tuple into named arguments, instead of having to refer to tuple._1
/tuple._2
scala> start.map({ case (k, v) => (k, v.sorted) }).collect()
res21: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))
But using the tuple syntax you're familiar with:
scala> start.map(x => (x._1, x._2.sorted)).collect()
res22: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))
I hope this helps. Edit as it looks like your issue is due to a lack of type information, I've added how I created my RDD that I used to run through scenarios.
val input: Array[Array[Int]] = Array(Array(1, 12, 3, 8, 10), Array(2, 42, 3, 65, 33))
val start: RDD[(Int, List(Int)] = sc.parallelize(input).map({
case Array(key, value @ _*) => (key, value.toList)
})
If you look at the method signature for List.sorted you'll see it has an implicit parameter which tells Scala how to sort the list.
Scala provides default implementations for things like numbers and strings, but it locates the implicit implementation based on the type of your List. It has no default for a list of Any
, which is the equivalent of a list of Object
in Java. So if you can amend your question to include more code, it will help identify where you're losing that type information.
Upvotes: 2