Reputation: 89
I'm using pyspark and had pairs like this:
(GroupKey , [(userKey, count),...,(userKey, count)])
where the value is a list of tuples, like this following exemple:
(Group1, [ (userA, 1), (userA, 1), (userB, 1), (userA, 1) ] )
(Group1, [ (userC, 1), (userA, 1), (userC, 1), (userA, 1) ] )
...
(Group2, [ (userB, 1), (userA, 1) ])
(Group2, [ (userA, 1), (userC, 1), (userC, 1), (userC, 1) ] )
I have to use RDDs, and I need to group the pairs by Key (GroupX) and reduce the list values also by key (userY), adding its values. So I would have this:
Group1: (userA, 5), (userB, 1), (userC, 2)
Group2: (userA, 2), (userB, 1), (userC, 3)
I had tried to use groupByKey
then reduceByKey
, and also aggregationByKey
but didn't figure out the proper way.
How could I achieve this?
Upvotes: 2
Views: 653
Reputation: 215057
Create a helper method sumByUser
as follows, and then aggregate by Group
:
rdd = sc.parallelize(
[("Group1", [("userA", 1), ("userA", 1), ("userB", 1), ("userA", 1)]),
("Group1", [("userC", 1), ("userA", 1), ("userC", 1), ("userA", 1)]),
("Group2", [("userB", 1), ("userA", 1)]),
("Group2", [("userA", 1), ("userC", 1), ("userC", 1), ("userC", 1)])]
)
from collections import Counter
def sumByUser(it):
count = Counter()
for lst in it:
for user, cnt in lst:
count[user] += cnt
return list(count.items())
rdd.groupByKey().mapValues(sumByUser).collect()
# [('Group1', [('userA', 5), ('userB', 1), ('userC', 2)]), ('Group2', [('userB', 1), ('userA', 2), ('userC', 3)])]
Upvotes: 2