Fobi
Fobi

Reputation: 445

Sum values of each unique key in Apache Spark RDD

I have an RDD[(String, (Long, Long))] where each element is not unique:

(com.instagram.android,(2,0))
(com.android.contacts,(6,1))
(com.android.contacts,(3,4))
(com.instagram.android,(8,3))
...

So I want to obtain an RDD where each element is the sum of the two values for every unique key:

(com.instagram.android,(10,3))
(com.android.contacts,(9,5))
...

Here is my code:

val appNamesAndPropertiesRdd = appNodesRdd.map({
  case Row(_, appName, totalUsageTime, usageFrequency, _, _, _, _) => 
    (appName, (totalUsageTime, usageFrequency))
})

Upvotes: 2

Views: 3672

Answers (4)

T. Gawęda
T. Gawęda

Reputation: 16096

Use reduceByKey:

val rdd = appNamesAndPropertiesRdd.reduceByKey(
  (acc, elem) => (acc._1 + elem._1, acc._2 + elem._2)
)

reduceByKey uses aggregateByKey described by SCouto, but has more readable usage. For your case, more advanced features of aggregateByKey - hidden by simpler API of reduceBykey - are not necessary

Upvotes: 3

sarveshseri
sarveshseri

Reputation: 13985

First of all, I don't think that usageFrequency should be simply added up.

Now, lets come to what you want to do, You want to add things by key, you can do it

1.Using groupByKey and then reducing the groups to sum things up,

val requiredRdd = appNamesAndPropertiesRdd
  .groupBy({ case (an, (tut, uf)) => an })
  .map({
    case (an, iter) => (
      an,
      iter
        .map({ case (an, tut, uf) => (tut, tf) })
        .reduce({ case ((tut1, tf1), (tut2, tf2)) => (tut1 + tut2, tf1 + tf2) })
    )
  })

Or by using reduceByKey

val requiredRdd = appNamesAndPropertiesRdd
  .reduceByKey({
    case ((tut1, uf1), (tut2, uf2)) => (tut1 + tut2, tf1 + tf2)
  })

And reduceByKey is a better choice for two reasons,

  1. It saves a not so required group operation.
  2. The groupBy approach can lead to a reshuffle which will be expensive.

Upvotes: 2

S.Karthik
S.Karthik

Reputation: 1389

Try this logic,

rdd.groupBy(_._1).map(x=> (x._1, (x._2.map(_._2).foldLeft((0,0)) {case ((acc1, acc2),(a, b))=> (acc1+a, acc2+b)} )))

Upvotes: 0

SCouto
SCouto

Reputation: 7926

The function aggregateByKey is the best one for this purpose

appNamesAndPropertiesRdd.aggregateByKey((0, 0))((acc, elem) => (acc._1 + elem._1, acc._2 +elem._2 ),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

Explained here:
aggregateByKey((0, 0)) => This is the zerovalue. The value that will be the initial one. In your case, since you want the addition, 0,0 will be the initial value (0.0, 0.0) if you want double instead of int

((acc, elem) => (acc._1 + elem._1, acc._2 +elem._2 ) => The first function. To accumulate the elements in the same partition. The accumulator will hold the partial value. Since elem is a tuple, you need to add each part of it to the correpondent part of the accumulator

(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) => The second function. To accumulate the accumulator from each partition.

Upvotes: 1

Related Questions