datasure
datasure

Reputation: 53

Sorting List of values in a RDD in Scala

I have a RDD in which my key is an id and values include list of ids. I want to sort the list of values in ascending order For Example

1, list(12,3,8,10)
2, list(42,3,65,33)
3, list(6,2,4,1)

Output

1, list(3,8,10,12)
2, list(3,33,42,65)
3, list(1,2,4,6)

RDD Creation So I have created RDD after joining two different RDDs and then used it productIterator to create list of values which is giving me RDD of type RDD(Int, List[Any])

I tried rdd.mapValues(x=> _.2.sorted) different sort methods but no luck

Upvotes: 0

Views: 1369

Answers (1)

Liam Clarke
Liam Clarke

Reputation: 385

You're nearly there.

mapValues, as it's name suggests, applies your mapping function only to the values. Your code looks like you're trying to extract the second element from a key/value tuple, which would, I imagine be throwing errors.

You can either use map or mapValues. map needs to return a tuple if you wish to retain your key, so mapValues is simpler, but I'll show you both ways. So we're starting with an RDD[(Int, List[Int])] which I've already constructed, and using collect() to view it.

scala> start
res17: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[6] at map at <console>:37

scala> start.collect()
res18: Array[(Int, List[Int])] = Array((1,List(12, 3, 8, 10)), (2,List(42, 3, 65, 33)))

Firstly, let's do the simplest thing:

scala> start.mapValues(x => x.sorted).collect()
res19: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))

As you can see, it returns the ordering you expect.

Using map to modify the key/value tuple is pretty simple, so long as you preserve your key. I recommend using Scala's case function syntax to destructure the tuple into named arguments, instead of having to refer to tuple._1/tuple._2

scala> start.map({ case (k, v) => (k, v.sorted) }).collect()
res21: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))

But using the tuple syntax you're familiar with:

scala> start.map(x => (x._1, x._2.sorted)).collect()
res22: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))

I hope this helps. Edit as it looks like your issue is due to a lack of type information, I've added how I created my RDD that I used to run through scenarios.

val input: Array[Array[Int]] = Array(Array(1, 12, 3, 8, 10), Array(2, 42, 3, 65, 33))

val start: RDD[(Int, List(Int)] = sc.parallelize(input).map({ 
  case Array(key, value @ _*) => (key, value.toList)
})

If you look at the method signature for List.sorted you'll see it has an implicit parameter which tells Scala how to sort the list.

Scala provides default implementations for things like numbers and strings, but it locates the implicit implementation based on the type of your List. It has no default for a list of Any, which is the equivalent of a list of Object in Java. So if you can amend your question to include more code, it will help identify where you're losing that type information.

Upvotes: 2

Related Questions