Max Willian
Max Willian

Reputation: 89

Pyspark: Aggregate RDD by key then sum the list of tuple values also by key

I'm using pyspark and had pairs like this:

(GroupKey , [(userKey, count),...,(userKey, count)])

where the value is a list of tuples, like this following exemple:

(Group1, [ (userA, 1), (userA, 1), (userB, 1), (userA, 1) ] )
(Group1, [ (userC, 1), (userA, 1), (userC, 1), (userA, 1) ] )
...
(Group2, [ (userB, 1), (userA, 1) ])
(Group2, [ (userA, 1), (userC, 1), (userC, 1), (userC, 1) ] )

I have to use RDDs, and I need to group the pairs by Key (GroupX) and reduce the list values also by key (userY), adding its values. So I would have this:

Group1: (userA, 5), (userB, 1), (userC, 2)
Group2: (userA, 2), (userB, 1), (userC, 3)

I had tried to use groupByKey then reduceByKey, and also aggregationByKey but didn't figure out the proper way. How could I achieve this?

Upvotes: 2

Views: 653

Answers (1)

akuiper
akuiper

Reputation: 215057

Create a helper method sumByUser as follows, and then aggregate by Group:

rdd = sc.parallelize(
    [("Group1", [("userA", 1), ("userA", 1), ("userB", 1), ("userA", 1)]),
     ("Group1", [("userC", 1), ("userA", 1), ("userC", 1), ("userA", 1)]),
     ("Group2", [("userB", 1), ("userA", 1)]),
     ("Group2", [("userA", 1), ("userC", 1), ("userC", 1), ("userC", 1)])]
)

from collections import Counter

def sumByUser(it):
    count = Counter()
    for lst in it:
        for user, cnt in lst:
            count[user] += cnt
    return list(count.items())

rdd.groupByKey().mapValues(sumByUser).collect()
# [('Group1', [('userA', 5), ('userB', 1), ('userC', 2)]), ('Group2', [('userB', 1), ('userA', 2), ('userC', 3)])]

Upvotes: 2

Related Questions