Reputation: 9865
I have a list of members, which have many attributes, two of which being a name and an ID. I wish to get a list of tuples in an RDD. The tuples will contain the ID
as the first element, and a count of the unique
number of names associated with the ID as the second element.
e.g. like: ID, <# of unique names associated with ID>
.
here's the code that I have written to accomplish this:
IDnametuple = members.map(lambda a: (a.ID, a.name)) # extract only ID and name
idnamelist = IDnametuple.groupByKey() # group the IDs together
idnameunique_count = (idnamelist
# set(tup[1]) should extract unique elements,
# and len should tell the number of them
.map(lambda tup: (tup[0], len(set(tup[1])))))
It is incredibly slow, and much slower than similar operations that count unique attributes for each member.
Is there a quicker way to do this? I tried to use as many built-ins as possible, which is the correct way to speed things up, from what I've heard.
Upvotes: 1
Views: 2938
Reputation: 330093
Without any details we can only guess but the obvious choice is groupByKey
. If each id is associated with a large number of names it can be pretty expensive due to extensive shuffling. The simplest improvement is to aggregateByKey
or combineByKey
:
create_combiner = set
def merge_value(acc, x):
acc.add(x)
return acc
def merge_combiners(acc1, acc2):
acc1.update(acc2)
return acc1
id_name_unique_count = (id_name_tuple # Keep consistent naming convention
.combineByKey(create_combiner, merge_value, merge_combiners)
.mapValues(len))
If expected number of unique values is large you may prefer to replace exact approach which an approximation. One possible approach is to use Bloom filter to keep track of unique values instead of set
.
For additional information regarding groupByKey
vs aggregateByKey
(reduceByKey
, combineByKey
) see:
Upvotes: 3
Reputation: 323
That is basically the word count example of https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs, but counting distinct key-value pairs:
from operator import add
IDnametuple = sc.parallelize([(0, "a"),(0, "a"),(0, "b"),(1, "a"),(1, "b"),(1, "c"),(2, "z")])
idnameunique_count = (IDnametuple.distinct()
.map(lambda idName : (idName[0], 1))
.reduceByKey(add))
So idnameunique_count.collect()
returns [(0, 2), (1, 3), (2, 1)]
where (0, "a")
is counted only once. As @zero323 mentioned, here the key is replacing groupByKey
by reduceByKey
in order to avoid creating the intermediate list of names. All you need is the name count, which is a much smaller object that a potentially huge list. Also your version uses set()
to eliminate duplicates sequentially in the closure code, while distinct
is executed as a distributed parallel RDD transformation.
Upvotes: 1