Devdatta Tengshe
Devdatta Tengshe

Reputation: 4075

Spark - Group on one Column and find Mean of other colums

I Have some data with 4 columns (c1, c2, c3 and c4) and have got it in a RDD via some scala code.

I want to group/bin by c1 and find mean of c2 and mean of c3 as well as c4 in each of the c1 groups.

I'm looking at RDD:reduceByKey, but I haven't managed to to understand exactly how it to be used. Is there a Better way to do this? How can I do this from the Scala API?

Upvotes: 2

Views: 8671

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37852

You say you have a DataFrame, so you probably shouldn't use the RDD API (which is often less efficient, and in this case probably less intuitive either) - here's a solution using the DataFrame API:

import org.apache.spark.sql.functions._

val result = df.groupBy("c1").agg(mean("c2"), mean("c3"), mean("c4"))

result would be a DataFrame with the following schema (assuming c1 is a String to begin with):

root
 |-- c1: string (nullable = true)
 |-- avg(c2): double (nullable = true)
 |-- avg(c3): double (nullable = true)
 |-- avg(c4): double (nullable = true)

EDIT:

in case the list of columns is dynamic, you can easily map such a list into a list of corresponding "means" and aggregate the DF using that list:

val colsToCompute = List("c2", "c3", "c4") // can be loaded dynamically
val means: Seq[Column] = colsToCompute.map(mean)
val result = df.groupBy("c1").agg(means.head, means.tail: _*)

For completeness - here's a solution using RDD API, but:

  • It's much less concise
  • It's much harder to "generify" for a dynamic number of columns
  • It might perform worse

There might be slightly shorter implementations, but not much simpler:

val rdd: RDD[(String, Int, Int, Int)] = ...

val result: RDD[(String, (Double, Double, Double))] = rdd
  .keyBy(_._1)
  .mapValues { case (k, v1, v2, v3) => (1, v1, v2, v3) } // add base for counter
  .reduceByKey { case ((a1, a2, a3, a4), (b1, b2, b3, b4)) => (a1+b1, a2+b2, a3+b3, a4+b4) } // sum counter and values
  .mapValues { case (count, v1, v2, v3) => (v1.toDouble/count, v2.toDouble/count, v3.toDouble/count) } // calculate means

Upvotes: 5

Related Questions