Katya Willard
Katya Willard

Reputation: 2182

Find distinct values for each column in an RDD in PySpark

I have an RDD that is both very long (a few billion rows) and decently wide (a few hundred columns). I want to create sets of the unique values in each column (these sets don't need to be parallelized, as they will contain no more than 500 unique values per column).

Here is what I have so far:

data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]])
num_columns = len(data.first())
empty_sets = [set() for index in xrange(num_columns)]
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y)))

What I am doing here is trying to initate a list of empty sets, one for each column in my RDD. For the first part of the aggregation, I want to iterate row by row through data, adding the value in column n to the nth set in my list of sets. If the value already exists, it doesn't do anything. Then, it performs the union of the sets afterwards so only distinct values are returned across all partitions.

When I try to run this code, I get the following error:

AttributeError: 'list' object has no attribute 'add'

I believe the issue is that I am not accurately making it clear that I am iterating through the list of sets (empty_sets) and that I am iterating through the columns of each row in data. I believe in (lambda a, b: a.add(b)) that a is empty_sets and b is data.first() (the entire row, not a single value). This obviously doesn't work, and isn't my intended aggregation.

How can I iterate through my list of sets, and through each row of my dataframe, to add each value to its corresponding set object?

The desired output would look like:

[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]


P.S I've looked at this example here, which is extremely similar to my use case (it's where I got the idea to use aggregate in the first place). However, I find the code very difficult to convert into PySpark, and I'm very unclear what the case and zip code is doing.

Upvotes: 1

Views: 3109

Answers (1)

Galen Long
Galen Long

Reputation: 3891

There are two problems. One, your combiner functions assume each row is a single set, but you're operating on a list of sets. Two, add doesn't return anything (try a = set(); b = a.add('1'); print b), so your first combiner function returns a list of Nones. To fix this, make your first combiner function non-anonymous and have both of them loop over the lists of sets:

def set_plus_row(sets, row):
    for i in range(len(sets)):
        sets[i].add(row[i])
    return sets


unique_values_per_column = data.aggregate(
    empty_sets, 
    set_plus_row, # can't be lambda b/c add doesn't return anything
    lambda x, y: [a.union(b) for a, b in zip(x, y)]
)

I'm not sure what zip does in Scala, but in Python, it takes two lists and puts each corresponding element together into tuples (try x = [1, 2, 3]; y = ['a', 'b', 'c']; print zip(x, y);) so you can loop over two lists simultaneously.

Upvotes: 2

Related Questions