Reputation: 185
I have an issue on Apache Spark in Python. I have this set
data = sc.parallelize([('a','u'), ('a', 'v'),
('b', 'w'), ('b', 'x'), ('b', 'x')] )
What I want to do is counting the number of elements by key and creating a list with the elements. If I do
a = data.combineByKey(lambda value: (value, 1),
lambda x, value: (value, x[1] + 1),
lambda x, y: (x[0]+'/'+y[0], x[1] + y[1]))
I have this result :
[('a', ('u/v', 2)), ('b', ('w/x/x', 3))]
What I would like to have is
[('a', (['u','v'], 2)), ('b', (['w','x','x'], 3))]
How can I do this ?
Upvotes: 2
Views: 1155
Reputation: 330063
If you want to keep all values as a list there is no reason to use combineByKey
at all. It is more efficient to simply groupBy
:
aggregated = data.groupByKey().mapValues(lambda vs: (list(vs), len(vs)))
aggregated.collect()
## [('a', (['u', 'v'], 2)), ('b', (['w', 'x', 'x'], 3))]
A little bit more efficient approach is to keep counts instead of all values:
aggregated_counts = (data
.map(lambda kv: (kv, 1))
.reduceByKey(add)
.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
.groupByKey()
.mapValues(lambda xs: (list(xs), sum(x[1] for x in xs))))
aggregated_counts.collect()
## [('a', ([('v', 1), ('u', 1)], 2)), ('b', ([('w', 1), ('x', 2)], 3))]
or
from collections import Counter
def merge_value(acc, x):
acc.update(x)
return acc
def merge_combiners(acc1, acc2):
acc1.update(acc2)
return acc1
aggregated_counts_ = (data
.combineByKey(Counter, merge_value, merge_combiners)
.mapValues(lambda cnt: (cnt, sum(cnt.values()))))
aggregated_counts_.collect()
## [('a', (Counter({'u': 1, 'v': 1}), 2)), ('b', (Counter({'w': 1, 'x': 2}), 3))]
Upvotes: 3