Reputation: 4075
I Have some data with 4 columns (c1, c2, c3 and c4) and have got it in a RDD via some scala code.
I want to group/bin by c1 and find mean of c2 and mean of c3 as well as c4 in each of the c1 groups.
I'm looking at RDD:reduceByKey, but I haven't managed to to understand exactly how it to be used. Is there a Better way to do this? How can I do this from the Scala API?
Upvotes: 2
Views: 8671
Reputation: 37852
You say you have a DataFrame
, so you probably shouldn't use the RDD
API (which is often less efficient, and in this case probably less intuitive either) - here's a solution using the DataFrame
API:
import org.apache.spark.sql.functions._
val result = df.groupBy("c1").agg(mean("c2"), mean("c3"), mean("c4"))
result
would be a DataFrame with the following schema (assuming c1
is a String to begin with):
root
|-- c1: string (nullable = true)
|-- avg(c2): double (nullable = true)
|-- avg(c3): double (nullable = true)
|-- avg(c4): double (nullable = true)
EDIT:
in case the list of columns is dynamic, you can easily map such a list into a list of corresponding "means" and aggregate the DF using that list:
val colsToCompute = List("c2", "c3", "c4") // can be loaded dynamically
val means: Seq[Column] = colsToCompute.map(mean)
val result = df.groupBy("c1").agg(means.head, means.tail: _*)
For completeness - here's a solution using RDD API, but:
There might be slightly shorter implementations, but not much simpler:
val rdd: RDD[(String, Int, Int, Int)] = ...
val result: RDD[(String, (Double, Double, Double))] = rdd
.keyBy(_._1)
.mapValues { case (k, v1, v2, v3) => (1, v1, v2, v3) } // add base for counter
.reduceByKey { case ((a1, a2, a3, a4), (b1, b2, b3, b4)) => (a1+b1, a2+b2, a3+b3, a4+b4) } // sum counter and values
.mapValues { case (count, v1, v2, v3) => (v1.toDouble/count, v2.toDouble/count, v3.toDouble/count) } // calculate means
Upvotes: 5