Reputation: 922
I am trying to count the appearances of specific Items at a specific date.
My input's structure is Date\tItem1:AppearencesOfItem1,...,ItemN:AppearencesOfItemN
Example
20/10/2000\tItem1:1,Item2:5
20/10/2000\tItem1:2
21/10/2000\tItem1:5
In order to do that, I create the following PairRdd structure:
[(20/10/2000, (Item1, 1))
(20/10/2000, (Item2, 5))
(20/10/2000, (Item1, 5))
(21/10/2000, (Item1, 5))]
and then groupByKey
on the date which leads to:
[(20/10/2000, Iterable[(Item1, 1), (Item2, 5), (Item1, 5))
(21/10/2000, Iterable[(Item1, 5)]
What I want to do after this step is reduce the value of these pairs and sum the appearances of the items that share the same key so that the outcome will become like:
[(20/10/2000, Iterable[(Item1, 6), (Item2, 5))
(21/10/2000, Iterable[(Item1, 5)]
However I haven't figured a way to reduce on the value of these pairRdds. Is my approach wrong in the first place?
Upvotes: 2
Views: 1310
Reputation: 7207
Can be achieved by two steps:
GroupBy (or reduceBy for performance) by first column
val data = List(
("20/10/2000", "Item1", 1),
("20/10/2000", "Item2", 5),
("20/10/2000", "Item1", 5),
("21/10/2000", "Item1", 5)
)
val originalRDD = sparkContext.parallelize(data)
val sumRDD = originalRDD.map(v => ((v._1, v._2), v._3)).reduceByKey(_ + _)
sumRDD.map(v => ((v._1._1), (v._1._2, v._2))).groupByKey().foreach(println)
Output:
(21/10/2000,CompactBuffer((Item1,5)))
(20/10/2000,CompactBuffer((Item1,6), (Item2,5)))
Upvotes: 2
Reputation: 7928
Hope this helps, it may be not he most elegant way but it seems to match your requirements:
rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum))
First map your values to group by itemId, then over that grouped list, map the values again to keep only the second element (the integer) so you can sum it up straightly
Output:
scala> rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum)).foreach(println)
(21/10/2000,Map(Item1 -> 5))
(20/10/2000,Map(Item2 -> 5, Item1 -> 6))
Edit
I was creating a map inside your RDD, if you want it as List or something just perform a toList
rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum).toList)
Upvotes: 1