Reputation: 445
I have an RDD[(String, (Long, Long))]
where each element is not unique:
(com.instagram.android,(2,0))
(com.android.contacts,(6,1))
(com.android.contacts,(3,4))
(com.instagram.android,(8,3))
...
So I want to obtain an RDD
where each element is the sum of the two values for every unique key:
(com.instagram.android,(10,3))
(com.android.contacts,(9,5))
...
Here is my code:
val appNamesAndPropertiesRdd = appNodesRdd.map({
case Row(_, appName, totalUsageTime, usageFrequency, _, _, _, _) =>
(appName, (totalUsageTime, usageFrequency))
})
Upvotes: 2
Views: 3672
Reputation: 16096
Use reduceByKey
:
val rdd = appNamesAndPropertiesRdd.reduceByKey(
(acc, elem) => (acc._1 + elem._1, acc._2 + elem._2)
)
reduceByKey
uses aggregateByKey
described by SCouto, but has more readable usage. For your case, more advanced features of aggregateByKey
- hidden by simpler API of reduceBykey
- are not necessary
Upvotes: 3
Reputation: 13985
First of all, I don't think that usageFrequency should be simply added up.
Now, lets come to what you want to do, You want to add things by key, you can do it
1.Using groupByKey
and then reducing
the groups to sum things up,
val requiredRdd = appNamesAndPropertiesRdd
.groupBy({ case (an, (tut, uf)) => an })
.map({
case (an, iter) => (
an,
iter
.map({ case (an, tut, uf) => (tut, tf) })
.reduce({ case ((tut1, tf1), (tut2, tf2)) => (tut1 + tut2, tf1 + tf2) })
)
})
Or by using reduceByKey
val requiredRdd = appNamesAndPropertiesRdd
.reduceByKey({
case ((tut1, uf1), (tut2, uf2)) => (tut1 + tut2, tf1 + tf2)
})
And reduceByKey
is a better choice for two reasons,
group
operation.groupBy
approach can lead to a reshuffle which will be expensive.Upvotes: 2
Reputation: 1389
Try this logic,
rdd.groupBy(_._1).map(x=> (x._1, (x._2.map(_._2).foldLeft((0,0)) {case ((acc1, acc2),(a, b))=> (acc1+a, acc2+b)} )))
Upvotes: 0
Reputation: 7926
The function aggregateByKey
is the best one for this purpose
appNamesAndPropertiesRdd.aggregateByKey((0, 0))((acc, elem) => (acc._1 + elem._1, acc._2 +elem._2 ),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
Explained here:
aggregateByKey((0, 0))
=> This is the zerovalue. The value that will be the initial one. In your case, since you want the addition, 0,0 will be the initial value (0.0, 0.0) if you want double instead of int
((acc, elem) => (acc._1 + elem._1, acc._2 +elem._2 )
=> The first function. To accumulate the elements in the same partition. The accumulator will hold the partial value. Since elem is a tuple, you need to add each part of it to the correpondent part of the accumulator
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
=> The second function. To accumulate the accumulator from each partition.
Upvotes: 1