Vijay Mohan
Vijay Mohan

Reputation: 1066

Using reducedByKey instead of GroupBy

How do I use reducedByKey instead of GroupBy for data stored as RDD?

Purpose is to group by key and then sum the values.

I have a working Scala process to find the Odds Ratio.

Problem:

Data we are ingesting to the script has grown drastically and started failing because of memory/disk issue. Main problem here is lot of shuffling because of "GROUP BY".

Sample Data:

(543040000711860,543040000839322,0,0,0,0)
(543040000711860,543040000938728,0,0,1,1)
(543040000711860,543040000984046,0,0,1,1)
(543040000711860,543040001071137,0,0,1,1)
(543040000711860,543040001121115,0,0,1,1)
(543040000711860,543040001281239,0,0,0,0)
(543040000711860,543040001332995,0,0,1,1)
(543040000711860,543040001333073,0,0,1,1)
(543040000839322,543040000938728,0,1,0,0)
(543040000839322,543040000984046,0,1,0,0)
(543040000839322,543040001071137,0,1,0,0)
(543040000839322,543040001121115,0,1,0,0)
(543040000839322,543040001281239,1,0,0,0)
(543040000839322,543040001332995,0,1,0,0)
(543040000839322,543040001333073,0,1,0,0)
(543040000938728,543040000984046,0,0,1,1)
(543040000938728,543040001071137,0,0,1,1)
(543040000938728,543040001121115,0,0,1,1)
(543040000938728,543040001281239,0,0,0,0)
(543040000938728,543040001332995,0,0,1,1)

Here is the code to transform my data:

var groupby = flags.groupBy(item =>(item._1, item._2) )
var counted_group = groupby.map(item => (item._1, item._2.map(_._3).sum, item._2.map(_._4).sum, item._2.map(_._5).sum, item._2.map(_._6).sum))

Result:

((3900001339662,3900002247644),6,12,38,38)

((543040001332995,543040001352893),112,29,57,57)

((3900001572602,543040001071137),1,0,1,1)

((3900001640810,543040001281239),2,1,0,0)

((3900001295323,3900002247644),8,21,8,8)

I need to convert this to "REDUCE BY KEY" so that data will be reduced in each partition before sending it back. I am using RDD so there is no direct method to do REDUCE BY.

Upvotes: 0

Views: 91

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

reducyByKey would require a RDD[(K, V)] i.e. key value pair, so you should create a rdd pairs first

val rddPair = flags.map(item => ((item._1, item._2), (item._3, item._4, item._5, item._6)))

Then you can use reduceByKey on above rddPair as

rddPair.reduceByKey((x, y)=> (x._1+y._1, x._2+y._2, x._3+y._3, x._4+y._4))

I hope the answer is helpful

Upvotes: 0

Vijay Mohan
Vijay Mohan

Reputation: 1066

I think I solved the problem by using aggregateByKey.

Remapped the RDD to generate a key value pair

val rddPair = flags.map(item => ((item._1, item._2), (item._3, item._4, item._5, item._6)))

Then applied the aggregateByKey function on the result, Now each partition return the aggregated result rather than group result.

rddPair.aggregateByKey((0, 0, 0, 0))(
    (iTotal, oisubtotal) => (iTotal._1 + oisubtotal._1, iTotal._2 +  oisubtotal._2,  iTotal._3 +  oisubtotal._3,  iTotal._4 +  oisubtotal._4 ),
    (fTotal, iTotal) => (fTotal._1 + iTotal._1, fTotal._2 + iTotal._2, fTotal._3 + iTotal._3, fTotal._4 + iTotal._4)
  )

Upvotes: 1

Related Questions