pirox22
pirox22

Reputation: 922

ReduceByKey on Iterable value of tuples

I am trying to count the appearances of specific Items at a specific date.

My input's structure is Date\tItem1:AppearencesOfItem1,...,ItemN:AppearencesOfItemN

Example

20/10/2000\tItem1:1,Item2:5
20/10/2000\tItem1:2
21/10/2000\tItem1:5

In order to do that, I create the following PairRdd structure:

[(20/10/2000, (Item1, 1))
(20/10/2000, (Item2, 5))
(20/10/2000, (Item1, 5))
(21/10/2000, (Item1, 5))]

and then groupByKey on the date which leads to:

[(20/10/2000, Iterable[(Item1, 1), (Item2, 5), (Item1, 5))
 (21/10/2000, Iterable[(Item1, 5)]

What I want to do after this step is reduce the value of these pairs and sum the appearances of the items that share the same key so that the outcome will become like:

[(20/10/2000, Iterable[(Item1, 6), (Item2, 5))
 (21/10/2000, Iterable[(Item1, 5)]

However I haven't figured a way to reduce on the value of these pairRdds. Is my approach wrong in the first place?

Upvotes: 2

Views: 1310

Answers (2)

pasha701
pasha701

Reputation: 7207

Can be achieved by two steps:

  1. Sum by first two columns
  2. GroupBy (or reduceBy for performance) by first column

    val data = List( 
      ("20/10/2000", "Item1", 1),
      ("20/10/2000", "Item2", 5),
      ("20/10/2000", "Item1", 5),
      ("21/10/2000", "Item1", 5)
    )
    val originalRDD = sparkContext.parallelize(data)
    
    val sumRDD = originalRDD.map(v => ((v._1, v._2), v._3)).reduceByKey(_ + _)
    sumRDD.map(v => ((v._1._1), (v._1._2, v._2))).groupByKey().foreach(println)
    

Output:

(21/10/2000,CompactBuffer((Item1,5)))
(20/10/2000,CompactBuffer((Item1,6), (Item2,5)))

Upvotes: 2

SCouto
SCouto

Reputation: 7928

Hope this helps, it may be not he most elegant way but it seems to match your requirements:

rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum))

First map your values to group by itemId, then over that grouped list, map the values again to keep only the second element (the integer) so you can sum it up straightly

Output:

scala> rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum)).foreach(println)
(21/10/2000,Map(Item1 -> 5))
(20/10/2000,Map(Item2 -> 5, Item1 -> 6))

Edit I was creating a map inside your RDD, if you want it as List or something just perform a toList

rdd.groupByKey.mapValues(x => x.groupBy(_._1).mapValues(x => x.map(_._2).sum).toList)

Upvotes: 1

Related Questions