Magic
Magic

Reputation: 557

Filtering data in an RDD

I have managed to pre process my data in pyspark to get something like this

[(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key3', u'2'), (u'key4', u'1'), (u'key1', u'4'), (u'key5', u'1'), (u'key6', u'2'), (u'key7', u'4'), (u'key8', u'5'), (u'key9', u'6'), (u'key10', u'7')]

Now I need to filter based on these conditions :

1) filter values associated to atleast 2 keys.

output - only those (k,v) pairs which has '1','2','4' as values should be present since they are associated with more than 2 keys

 [(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key3', u'2'), (u'key4', u'1'), (u'key1', u'4'), (u'key5', u'1'), (u'key6', u'2'), (u'key2', u'4')]

2) filter keys which are associated to atleast 2 values

output - only those (k,v) pairs which has key1, key2 as keys should be there since they are associated with atleast 2 values

[(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key1', u'4'), (u'key2', u'4')]

Any suggestions would be of great help.

Update : I used groupBy and a filter to group for keys with mutiple values

 [(u'key1', [u'1', u'2', u'4']), (u'key2',[u'1', u'4'])]

Now how do I split this (key, list(values)) to individual (k,v) pair to apply further transformation ?

Upvotes: 3

Views: 29811

Answers (2)

sau
sau

Reputation: 1356

my_rdd = sc.parallelize([(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key2', u'3'), (u'key4', u'1'), (u'key1', u'4'), (u'key4', u'1'), (u'key6', u'2'), (u'key7', u'4'), (u'key8', u'5'), (u'key9', u'6'), (u'key10', u'7')])

#filter keys which are associated to atleast 2 values

filter2_rdd = my_rdd.groupByKey() \
                    .mapValues(lambda x: list(x)) \
                    .filter(lambda x: len(x[1])>=2) \
                    .flatMap(lambda x: [(x[0],item) for item in x[1]])

#filter values associated to atleast 2 keys.
filte1_rdd = filter2_rdd.map(lambda x: (x[1],x[0])) \
                        .groupByKey().mapValues(lambda x: list(x))\
                        .filter(lambda x: len(x[1])>=2)\
                        .flatMap(lambda x: [(item,x[0]) for item in x[1]])

This will work!!

Upvotes: 4

user6022341
user6022341

Reputation:

Reduce by key, filter and join:

>>> rdd.mapValues(lambda _: 1) \  # Add key of value 1
...     .reduceByKey(lambda x, y: x + y) \ # Count keys
...     .filter(lambda x: x[1] >= 2) \ # Keep only if number is >= 2
...     .join(rdd) # join with original (serves as filter)
...     .mapValues(lambda x: x[0]) # reshape

Upvotes: 0

Related Questions