Reputation: 1
Could you please help me to achieve the following:
Consider an i/p which is a list of key value pairs, where key is a (tuple) and value is [list]. If there exist two same keys in the i/p then the value should be. intersected. If you do not find another key pair then, that key should be ignored in the o/p.
Example:
>>> data = [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 2), [1, 3, 4, 5, 6, 7, 10, 11])]
>>> rdd = sc.parallelize(data)
>>> rdd.reduceByKey(lambda x,y : list(set(x).intersection(y))).collect()
o/p: [((1, 2), [3, 4, 5, 6, 7, 10, 11])]
Since there are two occurrences of same key hence the values were intersected.
>>> data = [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 3), [1, 3, 4, 5, 6, 7, 10, 11])]
>>> rdd = sc.parallelize(data)
>>> rdd.reduceByKey(lambda x,y : list(set(x).intersection(y))).collect()
o/p that I get: [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 3), [1, 3, 4, 5, 6, 7, 10, 11])]
Intended Result: o/p should be nothing. Since there is no matching Key pair.
Upvotes: 0
Views: 1437
Reputation: 41
I implemented the logic as below
datardd.map(lambda x:(x[0],(x[1],1))).reduceByKey(lambda x,y:(set(x[0]).intersection(y[0]),x[1]+y[1])).filter(lambda x:x[1][1]>1).map(lambda x:(x[0],list(x[1][0]))).collect()
Map a counter variable for the given key along with existing list value in the form of
[(Key,(value,counter))]:
ex:[((1, 2), ([2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], 1))]
Use reduceByKey for implementing the intersection operation and incrementing the counter
Filter and publish the values for which counter value >1
Upvotes: 1