Kunal Trivedi
Kunal Trivedi

Reputation: 129

Scala Transformation and action

I have an RDD List[(String, List[Int])] like List(("A",List(1,2,3,4)),("B",List(5,6,7)))

How to transform them to List(("A",1),("A",2),("A",3),("A",4),("B",5),("B",6),("B",7))

Then action would be reducing by key and generating result like List(("A",2.5)("B",6))

I have tried using map(e=>List(e._1,e._2)) but its not giving desired result.

Where 2.5 is average for "A" and 6 is average for "B"

Help me with these set of transformation and actions. Thanks in advance

Upvotes: 0

Views: 585

Answers (3)

stack0114106
stack0114106

Reputation: 8711

You can try explode()

scala> val df = List(("A",List(1,2,3,4)),("B",List(5,6,7))).toDF("x","y")
df: org.apache.spark.sql.DataFrame = [x: string, y: array<int>]

scala> df.withColumn("z",explode('y)).show(false)
+---+------------+---+
|x  |y           |z  |
+---+------------+---+
|A  |[1, 2, 3, 4]|1  |
|A  |[1, 2, 3, 4]|2  |
|A  |[1, 2, 3, 4]|3  |
|A  |[1, 2, 3, 4]|4  |
|B  |[5, 6, 7]   |5  |
|B  |[5, 6, 7]   |6  |
|B  |[5, 6, 7]   |7  |
+---+------------+---+


scala> val df2 = df.withColumn("z",explode('y))
df2: org.apache.spark.sql.DataFrame = [x: string, y: array<int> ... 1 more field]

scala> df2.groupBy("x").agg(sum('z)/count('z) ).show(false)
+---+-------------------+
|x  |(sum(z) / count(z))|
+---+-------------------+
|B  |6.0                |
|A  |2.5                |
+---+-------------------+


scala>

Upvotes: 1

Travis Hegner
Travis Hegner

Reputation: 2495

If what you want is the average of each list in the end, then it's not necessary to break them up into individual elements with a flatMap. Doing so with a large list would unnecessarily shuffle a lot of data with a large data set.

Since they are already aggregated by key, just transform them with something like this:

val l = spark.sparkContext.parallelize(Seq(
  ("A", List(1, 2, 3, 4)),
  ("B", List(5, 6, 7))
))

val avg = l.map(r => {
    (r._1, (r._2.sum.toDouble / r._2.length.toDouble))
})

avg.collect.foreach(println)

Bear in mind that this will fail if any of your lists are 0 length. If you have some 0 length lists, you'll have to put a check condition in the map.

The above code gives you:

(A,2.5)
(B,6.0)

Upvotes: 1

Tizianoreica
Tizianoreica

Reputation: 2236

There are several ways to get what you want. You could use a for comprehension as well, but the very first one came up to my mind is this implementation:

val l = List(("A", List(1, 2, 3)), ("B", List(1, 2, 3)))

val flattenList = l.flatMap {
  case (elem, _elemList) =>
    _elemList.map((elem, _))
}

Output:

List((A,1), (A,2), (A,3), (B,1), (B,2), (B,3))

Upvotes: 1

Related Questions