Reputation: 613
Below is my sample data:
1,Siddhesh,43,32000
1,Siddhesh,12,4300
2,Devil,10,1000
2,Devil,10,3000
2,Devil,11,2000
I created pair RDD to perform combineByKey
and aggregateByKey
operations. Below is my code:
val rd=sc.textFile("file:///home/cloudera/Desktop/details.txt").map(line=>line.split(",")).map(p=>((p(0).toString,p(1).toString),(p(3).toLong,p(2).toString.toInt)))
Above I paired data of first two columns as key and the remaining columns as value. Now I want only distinct values from the right tuple for 3rd column in dataset which I was able to do with the combineByKey. Below is my code:
val reduced = rd.combineByKey(
(x:(Long,Int))=>{(x._1,Set(x._2))},
(x:(Long,Set[Int]),y:(Long,Int))=>(x._1+y._1,x._2+y._2),
(x:(Long,Set[Int]),y:(Long,Set[Int]))=>{(x._1+y._1,x._2++y._2)}
)
scala> reduced.foreach(println)
((1,Siddhesh),(36300,Set(43, 12)))
((2,Devil),(6000,Set(10, 11)))
Now I map it so that I can get the sum of values of unique distinct keys.
scala> val newRdd=reduced.map(p=>(p._1._1,p._1._2,p._2._1,p._2._2.size))
scala> newRdd.foreach(println)
(1,Siddhesh,36300,2)
(2,Devil,6000,2)
Here for devil the last value is 2 since I have 10 as 2 values for 'Devil' record in the dataset and since I have had used Set it eliminates the duplicates. So now I tried it with aggregateByKey
. Below is my code with error:
val rd=sc.textFile("file:///home/cloudera/Desktop/details.txt").map(line=>line.split(",")).map(p=>((p(0).toString,p(1).toString),(p(3).toString.toInt,p(2).toString.toInt)))
I converted the value column from long to int because while initializing it was throwing error on '0'
scala> val reducedByAggKey = rd.aggregateByKey((0,0))(
| (x:(Int,Set[Int]),y:(Int,Int))=>(x._1+y._1,x._2+y._2),
| (x:(Int,Set[Int]),y:(Int,Set[Int]))=>{(x._1+y._1,x._2++y._2)}
| )
<console>:36: error: type mismatch;
found : scala.collection.immutable.Set[Int]
required: Int
(x:(Int,Set[Int]),y:(Int,Int))=>(x._1+y._1,x._2+y._2),
^
<console>:37: error: type mismatch;
found : scala.collection.immutable.Set[Int]
required: Int
(x:(Int,Set[Int]),y:(Int,Set[Int]))=>{(x._1+y._1,x._2++y._2)}
^
And as suggested by Leo, below is my code with error:
scala> val reduced = rdd.aggregateByKey((0, Set.empty[Int]))(
| (x: (Int, Set[Int]), y: (Int, Int)) => (x._1 + y._1, y._2+x._2),
| (x: (Int, Set[Int]), y: (Int, Set[Int])) => (x._1 + y._1, y._2++ x._2)
| )
<console>:36: error: overloaded method value + with alternatives:
(x: Double)Double <and>
(x: Float)Float <and>
(x: Long)Long <and>
(x: Int)Int <and>
(x: Char)Int <and>
(x: Short)Int <and>
(x: Byte)Int <and>
(x: String)String
cannot be applied to (Set[Int])
(x: (Int, Set[Int]), y: (Int, Int)) => (x._1 + y._1, y._2+x._2),
^
So where am I making mess over here ?? Please correct me
Upvotes: 0
Views: 293
Reputation: 22439
If I understand your requirement correctly, to get the full count rather than distinct count, use List
instead of Set
for the aggregations. As to the problem with your aggregateByKey
, it's due to the incorrect type of the zeroValue
which should be (0, List.empty[Int])
(would've been (0, Set.empty[Int])
if you were to stick to using Set
):
val reduced = rdd.aggregateByKey((0, List.empty[Int]))(
(x: (Int, List[Int]), y: (Int, Int)) => (x._1 + y._1, y._2 :: x._2),
(x: (Int, List[Int]), y: (Int, List[Int])) => (x._1 + y._1, y._2 ::: x._2)
)
reduced.collect
// res1: Array[((String, String), (Int, List[Int]))] =
// Array(((2,Devil),(6000,List(11, 10, 10))), ((1,Siddhesh),(36300,List(12, 43))))
val newRdd = reduced.map(p => (p._1._1, p._1._2, p._2._1, p._2._2.size))
newRdd.collect
// res2: Array[(String, String, Int, Int)] =
// Array((2,Devil,6000,3), (1,Siddhesh,36300,2))
Note that the Set
to List
change would apply to your combineByKey
code as well if you want the full count instead of distinct count.
[UPDATE]
For distinct count per your comment, simply stay with Set
with zeroValue
set to (0, Set.empty[Int])
:
val reduced = rdd.aggregateByKey((0, Set.empty[Int]))(
(x: (Int, Set[Int]), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2),
(x: (Int, Set[Int]), y: (Int, Set[Int])) => (x._1 + y._1, x._2 ++ y._2)
)
reduced.collect
// res3: Array[((String, String), (Int, scala.collection.immutable.Set[Int]))] =
// Array(((2,Devil),(6000,Set(10, 11))), ((1,Siddhesh),(36300,Set(43, 12))))
Upvotes: 1