RushHour
RushHour

Reputation: 613

aggregateByKey method not working in spark rdd

Below is my sample data:

1,Siddhesh,43,32000
1,Siddhesh,12,4300
2,Devil,10,1000
2,Devil,10,3000
2,Devil,11,2000

I created pair RDD to perform combineByKey and aggregateByKey operations. Below is my code:

val rd=sc.textFile("file:///home/cloudera/Desktop/details.txt").map(line=>line.split(",")).map(p=>((p(0).toString,p(1).toString),(p(3).toLong,p(2).toString.toInt)))  

Above I paired data of first two columns as key and the remaining columns as value. Now I want only distinct values from the right tuple for 3rd column in dataset which I was able to do with the combineByKey. Below is my code:

val reduced = rd.combineByKey(
      (x:(Long,Int))=>{(x._1,Set(x._2))},
      (x:(Long,Set[Int]),y:(Long,Int))=>(x._1+y._1,x._2+y._2),
      (x:(Long,Set[Int]),y:(Long,Set[Int]))=>{(x._1+y._1,x._2++y._2)}
      )  
scala> reduced.foreach(println)
((1,Siddhesh),(36300,Set(43, 12)))
((2,Devil),(6000,Set(10, 11)))

Now I map it so that I can get the sum of values of unique distinct keys.

scala> val newRdd=reduced.map(p=>(p._1._1,p._1._2,p._2._1,p._2._2.size))

scala> newRdd.foreach(println)
(1,Siddhesh,36300,2)
(2,Devil,6000,2)

Here for devil the last value is 2 since I have 10 as 2 values for 'Devil' record in the dataset and since I have had used Set it eliminates the duplicates. So now I tried it with aggregateByKey. Below is my code with error:

val rd=sc.textFile("file:///home/cloudera/Desktop/details.txt").map(line=>line.split(",")).map(p=>((p(0).toString,p(1).toString),(p(3).toString.toInt,p(2).toString.toInt)))    

I converted the value column from long to int because while initializing it was throwing error on '0'

scala> val reducedByAggKey = rd.aggregateByKey((0,0))(
     |        (x:(Int,Set[Int]),y:(Int,Int))=>(x._1+y._1,x._2+y._2),
     |       (x:(Int,Set[Int]),y:(Int,Set[Int]))=>{(x._1+y._1,x._2++y._2)}
     | )
<console>:36: error: type mismatch;
 found   : scala.collection.immutable.Set[Int]
 required: Int
              (x:(Int,Set[Int]),y:(Int,Int))=>(x._1+y._1,x._2+y._2),
                                                             ^
<console>:37: error: type mismatch;
 found   : scala.collection.immutable.Set[Int]
 required: Int
             (x:(Int,Set[Int]),y:(Int,Set[Int]))=>{(x._1+y._1,x._2++y._2)}
                                                                  ^  

And as suggested by Leo, below is my code with error:

    scala> val reduced = rdd.aggregateByKey((0, Set.empty[Int]))(
     |   (x: (Int, Set[Int]), y: (Int, Int)) => (x._1 + y._1, y._2+x._2),
     |   (x: (Int, Set[Int]), y: (Int, Set[Int])) => (x._1 + y._1, y._2++ x._2)
     | )
<console>:36: error: overloaded method value + with alternatives:
  (x: Double)Double <and>
  (x: Float)Float <and>
  (x: Long)Long <and>
  (x: Int)Int <and>
  (x: Char)Int <and>
  (x: Short)Int <and>
  (x: Byte)Int <and>
  (x: String)String
 cannot be applied to (Set[Int])
         (x: (Int, Set[Int]), y: (Int, Int)) => (x._1 + y._1, y._2+x._2),
                                                                  ^

So where am I making mess over here ?? Please correct me

Upvotes: 0

Views: 293

Answers (1)

Leo C
Leo C

Reputation: 22439

If I understand your requirement correctly, to get the full count rather than distinct count, use List instead of Set for the aggregations. As to the problem with your aggregateByKey, it's due to the incorrect type of the zeroValue which should be (0, List.empty[Int]) (would've been (0, Set.empty[Int]) if you were to stick to using Set):

val reduced = rdd.aggregateByKey((0, List.empty[Int]))(
  (x: (Int, List[Int]), y: (Int, Int)) => (x._1 + y._1, y._2 :: x._2),
  (x: (Int, List[Int]), y: (Int, List[Int])) => (x._1 + y._1, y._2 ::: x._2)
)

reduced.collect
// res1: Array[((String, String), (Int, List[Int]))] =
//   Array(((2,Devil),(6000,List(11, 10, 10))), ((1,Siddhesh),(36300,List(12, 43))))

val newRdd = reduced.map(p => (p._1._1, p._1._2, p._2._1, p._2._2.size))

newRdd.collect
// res2: Array[(String, String, Int, Int)] =
//   Array((2,Devil,6000,3), (1,Siddhesh,36300,2))

Note that the Set to List change would apply to your combineByKey code as well if you want the full count instead of distinct count.

[UPDATE]

For distinct count per your comment, simply stay with Set with zeroValue set to (0, Set.empty[Int]):

val reduced = rdd.aggregateByKey((0, Set.empty[Int]))(
  (x: (Int, Set[Int]), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2),
  (x: (Int, Set[Int]), y: (Int, Set[Int])) => (x._1 + y._1, x._2 ++ y._2)
)

reduced.collect
// res3: Array[((String, String), (Int, scala.collection.immutable.Set[Int]))] =
//   Array(((2,Devil),(6000,Set(10, 11))), ((1,Siddhesh),(36300,Set(43, 12))))

Upvotes: 1

Related Questions