Omegaspard
Omegaspard

Reputation: 1960

How to reverse the result of reduceByKey using RDD API?

I have a RDD of (key, value) that I transformed into a RDD of (key, List(value1, value2, value3) as follow.

val rddInit = sc.parallelize(List((1, 2), (1, 3), (2, 5), (2, 7), (3, 10)))
val rddReduced = rddInit..groupByKey.mapValues(_.toList)
rddReduced.take(3).foreach(println)

This code give me the next RDD : (1,List(2, 3)) (2,List(5, 7)) (3,List(10))

But now I would like to go back to the rddInit from the rdd I just computed (the rddReduced rdd).

My first guess is to realise some kind of cross product between the key and each element of the List like this :

rddReduced.map{
  case (x, y) =>
    val myList:ListBuffer[(Int, Int)] = ListBuffer()
    for(element <- y) {
      myList+=new Pair(x, element)
    }
    myList.toList
}.flatMap(x => x).take(5).foreach(println)

With this code, I get the initial RDD as a result. But I don't think using a ListBuffer inside a spark job is a good practice. Is there any other way to resolve this problem ?

Upvotes: 2

Views: 1346

Answers (4)

Leo C
Leo C

Reputation: 22439

Here's one way to restore the grouped RDD back to original:

val rddRestored = rddReduced.flatMap{
    case (k, v) => v.map((k, _))
  }

rddRestored.collect.foreach(println)
(1,2)
(1,3)
(2,5)
(2,7)
(3,10)

Upvotes: 1

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

I'm surprised no one has offered a solution with Scala's for-comprehension (that gets "desugared" to flatMap and map at compile time).

I don't use this syntax very often, but when I do...I find it quite entertaining. Some people prefer for-comprehension over a series of flatMap and map, esp. for more complex transformations.

// that's what you ended up with after `groupByKey.mapValues`
val rddReduced: RDD[(Int, List[Int])] = ...
val r = for {
  (k, values) <- rddReduced
  v <- values
} yield (k, v)

scala> :type r
org.apache.spark.rdd.RDD[(Int, Int)]

scala> r.foreach(println)
(3,10)
(2,5)
(2,7)
(1,2)
(1,3)

// even nicer to our eyes
scala> r.toDF("key", "value").show
+---+-----+
|key|value|
+---+-----+
|  1|    2|
|  1|    3|
|  2|    5|
|  2|    7|
|  3|   10|
+---+-----+

After all, that's why we enjoy flexibility of Scala, isn't it?

Upvotes: 3

Haroun Mohammedi
Haroun Mohammedi

Reputation: 2424

It's obviously not a good practice to use that kind of operation.

From what I have learned in a spark-summit course, you have to use Dataframes and Datasets as much as possible, using them you will benefit from a lot of optimizations form spark engine.

What you wanna do is called explode and it's preformed by applying the explode method from the sql.functions package

The solution whould be something like this :

 import spark.implicits._
 import org.apache.spark.sql.functions.explode
 import org.apache.spark.sql.functions.collect_list

 val dfInit = sc.parallelize(List((1, 2), (1, 3), (2, 5), (2, 7), (3, 10))).toDF("x", "y")
 val dfReduced = dfInit.groupBy("x").agg(collect_list("y") as "y")
 val dfResult = dfReduced.withColumn("y", explode($"y"))

dfResult will contains the same data as the dfInit

Upvotes: 2

koiralo
koiralo

Reputation: 23099

According to your question, I think this is what you want to do

rddReduced.map{case(x, y) => y.map((x,_))}.flatMap(_).take(5).foreach(println)

You get a list after group by in which you can map through it again.

Upvotes: 0

Related Questions