Reputation: 347
I have a dataset like this.
rdd = sc.parallelize((('A',('a',1)),('B',('b',3)),('A',('c',3))))
What I want to do is:
Count how many entries belong to group A/B.
Within each group (A/B), count how entries belong to each subgroup (which is 'a','b','c').
As the above example, the answer I want to get is:
I can get the first-level result by
rdd.countByKey()
which returns
defaultdict(<type 'int'>, {'A': 2, 'B': 1})
But how could I get the second-level results?
If I group the data by
rdd.groupByKey()
How can I map function to the data in each group like map groupByKey again? I notice the value of the results are
pyspark.resultiterable.ResultIterable
which can not be applied groupBy or map.
Upvotes: 0
Views: 1903
Reputation: 2108
This is a step by step solution.
from collections import Counter
rdd = sc.parallelize((('A',('a',1)),('B',('b',3)),('A',('c',3))))
# [('A', ('a', 1)), ('B', ('b', 3)), ('A', ('c', 3))]
a = rdd.groupByKey().mapValues(list)
#[('A', [('a', 1), ('c', 3)]), ('B', [('b', 3)])]
b = a.map(lambda line: line[1])
# [[('a', 1), ('c', 3)], [('b', 3)]]
c = b.map(lambda line: [x[0] for x in line])
# [['a', 'c'], ['b']]
d = c.map(lambda line: Counter(line))
# [Counter({'a': 1, 'c': 1}), Counter({'b': 1})]
You can use mapValues(list) if you want to have a list of the values after the groupByKey() application
If you want to map the information stored in the RDD named as 'd' (in this case [Counter({'a': 1, 'c': 1}), Counter({'b': 1})]
) you can see this docs and do:
e = d.map(lambda line: list(line.elements()))
# [['a', 'c'], ['b']]
f = d.map(lambda line: list(line.values()))
# [[1, 1], [1]]
Upvotes: 1
Reputation: 2594
Like this?
` val rdd = sc.parallelize(Seq(('A', ('a', 1)), ('B',('b',3)),('A',('c',3))))
val grouped = rdd.groupByKey
grouped.map(x => (s"\nTopLevel ${x._1} : ${x._2.size}\nSubLevel ${x._2.groupBy(i=>i.1).mapValues(.size)}")).collect `
Upvotes: 0