Karhunen
Karhunen

Reputation: 185

Apache Spark CombineByKey with list of elements in Python

I have an issue on Apache Spark in Python. I have this set

data = sc.parallelize([('a','u'), ('a', 'v'),
    ('b', 'w'), ('b', 'x'), ('b', 'x')] )

What I want to do is counting the number of elements by key and creating a list with the elements. If I do

a = data.combineByKey(lambda value: (value, 1),
                      lambda x, value: (value, x[1] + 1),
                      lambda x, y: (x[0]+'/'+y[0], x[1] + y[1]))

I have this result :

[('a', ('u/v', 2)), ('b', ('w/x/x', 3))]

What I would like to have is

[('a', (['u','v'], 2)), ('b', (['w','x','x'], 3))]

How can I do this ?

Upvotes: 2

Views: 1155

Answers (1)

zero323
zero323

Reputation: 330063

If you want to keep all values as a list there is no reason to use combineByKey at all. It is more efficient to simply groupBy:

aggregated = data.groupByKey().mapValues(lambda vs: (list(vs), len(vs)))
aggregated.collect()
## [('a', (['u', 'v'], 2)), ('b', (['w', 'x', 'x'], 3))]

A little bit more efficient approach is to keep counts instead of all values:

aggregated_counts = (data
    .map(lambda kv: (kv, 1))
    .reduceByKey(add)
    .map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
    .groupByKey()
    .mapValues(lambda xs: (list(xs), sum(x[1] for x in xs))))

aggregated_counts.collect()
## [('a', ([('v', 1), ('u', 1)], 2)), ('b', ([('w', 1), ('x', 2)], 3))]

or

from collections import Counter

def merge_value(acc, x):
    acc.update(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.update(acc2)
    return acc1

aggregated_counts_ = (data
    .combineByKey(Counter, merge_value, merge_combiners)
    .mapValues(lambda cnt: (cnt, sum(cnt.values()))))

aggregated_counts_.collect()
## [('a', (Counter({'u': 1, 'v': 1}), 2)), ('b', (Counter({'w': 1, 'x': 2}), 3))]

Upvotes: 3

Related Questions