makansij
makansij

Reputation: 9865

Efficient way to count unique values for each key

I have a list of members, which have many attributes, two of which being a name and an ID. I wish to get a list of tuples in an RDD. The tuples will contain the ID as the first element, and a count of the unique number of names associated with the ID as the second element.

e.g. like: ID, <# of unique names associated with ID>.

here's the code that I have written to accomplish this:

IDnametuple = members.map(lambda a: (a.ID, a.name))   # extract only ID and name
idnamelist = IDnametuple.groupByKey()                 # group the IDs together 
idnameunique_count = (idnamelist
     # set(tup[1]) should extract unique elements, 
     # and len should tell the number of them
    .map(lambda tup: (tup[0], len(set(tup[1]))))) 

It is incredibly slow, and much slower than similar operations that count unique attributes for each member.

Is there a quicker way to do this? I tried to use as many built-ins as possible, which is the correct way to speed things up, from what I've heard.

Upvotes: 1

Views: 2938

Answers (2)

zero323
zero323

Reputation: 330093

Without any details we can only guess but the obvious choice is groupByKey. If each id is associated with a large number of names it can be pretty expensive due to extensive shuffling. The simplest improvement is to aggregateByKey or combineByKey:

create_combiner = set

def merge_value(acc, x):
    acc.add(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.update(acc2)
    return acc1

id_name_unique_count = (id_name_tuple  # Keep consistent naming convention
  .combineByKey(create_combiner, merge_value, merge_combiners)
  .mapValues(len))

If expected number of unique values is large you may prefer to replace exact approach which an approximation. One possible approach is to use Bloom filter to keep track of unique values instead of set.

For additional information regarding groupByKey vs aggregateByKey (reduceByKey, combineByKey) see:

Upvotes: 3

juanrh0011
juanrh0011

Reputation: 323

That is basically the word count example of https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs, but counting distinct key-value pairs:

from operator import add
IDnametuple = sc.parallelize([(0, "a"),(0, "a"),(0, "b"),(1, "a"),(1, "b"),(1, "c"),(2, "z")])
idnameunique_count = (IDnametuple.distinct()
                                  .map(lambda idName : (idName[0], 1))
                                  .reduceByKey(add))

So idnameunique_count.collect() returns [(0, 2), (1, 3), (2, 1)] where (0, "a") is counted only once. As @zero323 mentioned, here the key is replacing groupByKey by reduceByKey in order to avoid creating the intermediate list of names. All you need is the name count, which is a much smaller object that a potentially huge list. Also your version uses set() to eliminate duplicates sequentially in the closure code, while distinct is executed as a distributed parallel RDD transformation.

Upvotes: 1

Related Questions