Reputation: 626
Is there a way in spark to filter elements in one RDD on the basis of another RDD even if they don't share the same key?
I have two RDDs - abc and xyz
abc.collect() looks like this
[[a,b,c],[a,c,g,e],[a,e,b,x],[b,c]]
xyz.collect() looks like this
[a,b,c]
Now I want to filter out all elements from RDD abc which are not present in xyz.
After the said operation, RDD Abc should look like this:
[[a,b,c],[a,c],[a,b],[b,c]]
I wrote a code that looks something like this :
def prune(eachlist):
for i in eachlist:
if i in xyz:
return i
abc = abc.map(prune)
However, that gives me this error :
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation
I have since tried filters, lookups instead of map to no avail. I keep getting the same error.
I know I can do a collect operation on xyz and resolve this error, but I am running this on a large dataset and doing a .collect() kills my AWS server for exceeding too much memory. Therefore I need to do this without using .collect() or any such equivalent expensive operations.
Upvotes: 0
Views: 327
Reputation:
You can:
# Add index
abc.zipWithIndex() \
# Flatten values
.flatMap(lambda x: [(k, x[1]) for k in x[0]]) \
# Join with xyz (works as filter)
.join(xyz.map(lambda x: (x, None))) \
# Group back by index
.map(lambda x: (x[1][0], x[0])) \
.groupByKey() \
.map(lambda x: list(x[1]))
or you can create Bloom filter on xyz
and use it to map abc
.
Upvotes: 2