Manivannan Dharman
Manivannan Dharman

Reputation: 19

How to store the duplicates of an spark rdd into another rdd

I have two spark rdd : RDD1:RDD[(String,String,Int)] and RDD2:RDD[(String,String,Int)]

RDD1 is original data and RDD2 is distinct of RDD1

i need to create an RDD3 which is the RDD1-RDD2

for example :

RDD1:  [("one","one",23)],[("one","one",23)],[("two","two",28)],[("one","one",23)]
RDD2:  [("one","one",23)],[("two","two",28)]

expected

RDD3:[("one","one",23)],[("one","one",23)] 

only the duplicates where count of the duplicates is reduced by 1

RD# is collection of only the duplicates for example if 10 transactions are there 1 is unique so i should collect the 9 transactions in the RDD3

Upvotes: 0

Views: 79

Answers (1)

vivesh saladi
vivesh saladi

Reputation: 1

val rdd1 = sc.parallelize(List(("A", "ANT", 1), ("A", "ANT", 1), ("B", "BALL", 2), ("C", "CAT", 3), ("C", "CAT", 3))).zipWithIndex()
rdd1.collect().foreach(r => print(r))

val rdd2 = rdd1.reduceByKey((a, b) => if (a != b) a else b)
rdd2.collect().foreach(r => print(r))


val rdd3 = rdd1.subtract(rdd2).map(p => p._1)
rdd3.collect().foreach(r => print(r))

Result:

RDD1: ((A,ANT,1),0)((A,ANT,1),1)((B,BALL,2),2)((C,CAT,3),3)((C,CAT,3),4)
RDD2: ((A,ANT,1),0)((C,CAT,3),3)((B,BALL,2),2)
RDD3: (C,CAT,3)(A,ANT,1)

Upvotes: 0

Related Questions