Hassan Jalil
Hassan Jalil

Reputation: 1194

Spark combineByKey on values that contains tuples

So I have a spark RDD that contains Values of the following type

RDD[(Key:Int, (val1:Double, val2:Double))]

For Example

(1,(1,1))
(1,(1,2))
(1,(1,3))
(1,(1,4))
(1,(1,5))
(2,(1,1))
(2,(1,2))
(2,(1,3))
(2,(1,4))
(2,(1,5))

Where Int is the key, and tuple contains two Double Values

I want to apply a combineByKey operation where for each key we do the following operation

val2/val1

I basically want to find the division of these two values for each occurrence of the key and then find their average for a given key. So for createCombiner we would divide the values and create a counter. For mergeValue, I want to divide these values for the given key and then sum them to the previous value and increase the counter.

And finally for mergeCombiner, Sum the values of the combiners and divide them by the total number of values (The division maybe be done in a separate map ?)

My Problem is, I can not seem to find any example where they run combineByKey on a pair, where the values are a tuple instead of a single Integer value .

I tried writing the following code

arr2.combineByKey((v) => (v._2/v._1, 1),\\ Creating Combiner
(acc: (Double, Int), q:(Double,Double)) => ((q._2/q._1)+acc._1,acc._2+1), \\Merging Values
(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) \\Merging Combiners

But it gives me the following Errors for the merge value part

type mismatch;  found   : ((Double, Int), (Double, Double)) => (Double, Int)  required: (?, (Int, Int)) => ?

Can some one help me out here and help me understand what I am doing wrong. How do i Access both the values of the tuple and divide them and then add them to the previous values ?

Any help would be highly appreciated

Upvotes: 0

Views: 643

Answers (1)

Atreys
Atreys

Reputation: 3761

The type declared on q in the merge parameter is the culprit, plus you need to convert to a double before the division to get correct values

arr2.combineByKey((v) => (v._2.toDouble / v._1, 1),
(acc: (Double, Int), q:(Int,Int)) => ((q._2/q._1)+acc._1,acc._2+1),
(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

Upvotes: 1

Related Questions