Reputation: 1066
How do I use reducedByKey instead of GroupBy for data stored as RDD?
Purpose is to group by key and then sum the values.
I have a working Scala process to find the Odds Ratio.
Problem:
Data we are ingesting to the script has grown drastically and started failing because of memory/disk issue. Main problem here is lot of shuffling because of "GROUP BY".
Sample Data:
(543040000711860,543040000839322,0,0,0,0)
(543040000711860,543040000938728,0,0,1,1)
(543040000711860,543040000984046,0,0,1,1)
(543040000711860,543040001071137,0,0,1,1)
(543040000711860,543040001121115,0,0,1,1)
(543040000711860,543040001281239,0,0,0,0)
(543040000711860,543040001332995,0,0,1,1)
(543040000711860,543040001333073,0,0,1,1)
(543040000839322,543040000938728,0,1,0,0)
(543040000839322,543040000984046,0,1,0,0)
(543040000839322,543040001071137,0,1,0,0)
(543040000839322,543040001121115,0,1,0,0)
(543040000839322,543040001281239,1,0,0,0)
(543040000839322,543040001332995,0,1,0,0)
(543040000839322,543040001333073,0,1,0,0)
(543040000938728,543040000984046,0,0,1,1)
(543040000938728,543040001071137,0,0,1,1)
(543040000938728,543040001121115,0,0,1,1)
(543040000938728,543040001281239,0,0,0,0)
(543040000938728,543040001332995,0,0,1,1)
Here is the code to transform my data:
var groupby = flags.groupBy(item =>(item._1, item._2) )
var counted_group = groupby.map(item => (item._1, item._2.map(_._3).sum, item._2.map(_._4).sum, item._2.map(_._5).sum, item._2.map(_._6).sum))
Result:
((3900001339662,3900002247644),6,12,38,38)
((543040001332995,543040001352893),112,29,57,57)
((3900001572602,543040001071137),1,0,1,1)
((3900001640810,543040001281239),2,1,0,0)
((3900001295323,3900002247644),8,21,8,8)
I need to convert this to "REDUCE BY KEY" so that data will be reduced in each partition before sending it back. I am using RDD so there is no direct method to do REDUCE BY.
Upvotes: 0
Views: 91
Reputation: 41957
reducyByKey
would require a RDD[(K, V)]
i.e. key value pair, so you should create a rdd pairs first
val rddPair = flags.map(item => ((item._1, item._2), (item._3, item._4, item._5, item._6)))
Then you can use reduceByKey
on above rddPair
as
rddPair.reduceByKey((x, y)=> (x._1+y._1, x._2+y._2, x._3+y._3, x._4+y._4))
I hope the answer is helpful
Upvotes: 0
Reputation: 1066
I think I solved the problem by using aggregateByKey.
Remapped the RDD to generate a key value pair
val rddPair = flags.map(item => ((item._1, item._2), (item._3, item._4, item._5, item._6)))
Then applied the aggregateByKey function on the result, Now each partition return the aggregated result rather than group result.
rddPair.aggregateByKey((0, 0, 0, 0))(
(iTotal, oisubtotal) => (iTotal._1 + oisubtotal._1, iTotal._2 + oisubtotal._2, iTotal._3 + oisubtotal._3, iTotal._4 + oisubtotal._4 ),
(fTotal, iTotal) => (fTotal._1 + iTotal._1, fTotal._2 + iTotal._2, fTotal._3 + iTotal._3, fTotal._4 + iTotal._4)
)
Upvotes: 1