Mansi
Mansi

Reputation: 1

Pyspark - Finding Intersection of key value pairs

Could you please help me to achieve the following:

Consider an i/p which is a list of key value pairs, where key is a (tuple) and value is [list]. If there exist two same keys in the i/p then the value should be. intersected. If you do not find another key pair then, that key should be ignored in the o/p.

Example:

>>> data = [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 2), [1, 3, 4, 5, 6, 7, 10, 11])]
>>> rdd = sc.parallelize(data)
>>> rdd.reduceByKey(lambda x,y : list(set(x).intersection(y))).collect()

o/p: [((1, 2), [3, 4, 5, 6, 7, 10, 11])]

Since there are two occurrences of same key hence the values were intersected.

>>> data = [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 3), [1, 3, 4, 5, 6, 7, 10, 11])]
>>> rdd = sc.parallelize(data)
>>> rdd.reduceByKey(lambda x,y : list(set(x).intersection(y))).collect()

o/p that I get: [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 3), [1, 3, 4, 5, 6, 7, 10, 11])]

Intended Result: o/p should be nothing. Since there is no matching Key pair.

Upvotes: 0

Views: 1437

Answers (1)

krishna rachur
krishna rachur

Reputation: 41

I implemented the logic as below

datardd.map(lambda x:(x[0],(x[1],1))).reduceByKey(lambda x,y:(set(x[0]).intersection(y[0]),x[1]+y[1])).filter(lambda x:x[1][1]>1).map(lambda x:(x[0],list(x[1][0]))).collect()
  1. Map a counter variable for the given key along with existing list value in the form of

    [(Key,(value,counter))]:

    ex:[((1, 2), ([2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], 1))]

  2. Use reduceByKey for implementing the intersection operation and incrementing the counter

  3. Filter and publish the values for which counter value >1

Upvotes: 1

Related Questions