user5228393
user5228393

Reputation:

Pair RDD Transformations

If I have a data set similar to this:

val list = List ( (1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3) )

And I want to find the average per-key so the output should be:

(1, 2), (2, 3/2), (3, 2) can I do this using groupByKey, countByKey, and reduceByKey somehow or do I have to use combineByKey method similar to the example showed below: I tried using groupByKey, countByKey, and reduceByKey but this combination of methods do not work out, I was wondering if anyone knows a way to do it using those three methods?

val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).  
map{ case (key, value) => (key, value._1 / value._2.toFloat) } 

result.collectAsMap().map(println(_))

Upvotes: 0

Views: 347

Answers (3)

elm
elm

Reputation: 20415

Using reduceByKey, with prior transformation of the duples into triplets,

rdd.map{ case(k,v) => (k,(v,1)) }.
    reduceByKey( (a,v) => (a._1+v._1, a._2+v._2)).
    map {case (k,v) => (k, v._1 / v._2)}

Upvotes: 0

Alberto Bonsanto
Alberto Bonsanto

Reputation: 18042

Well you can simply use PairRDDFunctions.groupByKey and compute what you want.

val avgKey = input.groupByKey.map{
  case (k, v) => (k, v.sum.toDouble/v.size)
}
avgkey.collect
//res2: Array[(Int, Double)] = Array((3,2.0), (1,2.0), (2,1.5))

Upvotes: 0

Paweł Jurczenko
Paweł Jurczenko

Reputation: 4471

You should try the following:

val sc: SparkContext = ...
val input = sc.parallelize(List((1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3)))
val averages = input.groupByKey.map { case (key, values) =>
  (key, values.sum / values.size.toDouble)
}

println(averages.collect().toList) // List((1,2.0), (2,1.5), (3,2.0))

Upvotes: 3

Related Questions