Reputation: 129
I have an RDD List[(String, List[Int])] like List(("A",List(1,2,3,4)),("B",List(5,6,7)))
How to transform them to List(("A",1),("A",2),("A",3),("A",4),("B",5),("B",6),("B",7))
Then action would be reducing by key and generating result like List(("A",2.5)("B",6))
I have tried using map(e=>List(e._1,e._2)) but its not giving desired result.
Where 2.5 is average for "A" and 6 is average for "B"
Help me with these set of transformation and actions. Thanks in advance
Upvotes: 0
Views: 585
Reputation: 8711
You can try explode()
scala> val df = List(("A",List(1,2,3,4)),("B",List(5,6,7))).toDF("x","y")
df: org.apache.spark.sql.DataFrame = [x: string, y: array<int>]
scala> df.withColumn("z",explode('y)).show(false)
+---+------------+---+
|x |y |z |
+---+------------+---+
|A |[1, 2, 3, 4]|1 |
|A |[1, 2, 3, 4]|2 |
|A |[1, 2, 3, 4]|3 |
|A |[1, 2, 3, 4]|4 |
|B |[5, 6, 7] |5 |
|B |[5, 6, 7] |6 |
|B |[5, 6, 7] |7 |
+---+------------+---+
scala> val df2 = df.withColumn("z",explode('y))
df2: org.apache.spark.sql.DataFrame = [x: string, y: array<int> ... 1 more field]
scala> df2.groupBy("x").agg(sum('z)/count('z) ).show(false)
+---+-------------------+
|x |(sum(z) / count(z))|
+---+-------------------+
|B |6.0 |
|A |2.5 |
+---+-------------------+
scala>
Upvotes: 1
Reputation: 2495
If what you want is the average of each list in the end, then it's not necessary to break them up into individual elements with a flatMap
. Doing so with a large list would unnecessarily shuffle a lot of data with a large data set.
Since they are already aggregated by key, just transform them with something like this:
val l = spark.sparkContext.parallelize(Seq(
("A", List(1, 2, 3, 4)),
("B", List(5, 6, 7))
))
val avg = l.map(r => {
(r._1, (r._2.sum.toDouble / r._2.length.toDouble))
})
avg.collect.foreach(println)
Bear in mind that this will fail if any of your lists are 0
length. If you have some 0
length lists, you'll have to put a check condition in the map.
The above code gives you:
(A,2.5)
(B,6.0)
Upvotes: 1
Reputation: 2236
There are several ways to get what you want. You could use a for comprehension as well, but the very first one came up to my mind is this implementation:
val l = List(("A", List(1, 2, 3)), ("B", List(1, 2, 3)))
val flattenList = l.flatMap {
case (elem, _elemList) =>
_elemList.map((elem, _))
}
Output:
List((A,1), (A,2), (A,3), (B,1), (B,2), (B,3))
Upvotes: 1