Reputation: 19
I have two spark rdd : RDD1:RDD[(String,String,Int)] and RDD2:RDD[(String,String,Int)]
RDD1 is original data and RDD2 is distinct of RDD1
i need to create an RDD3 which is the RDD1-RDD2
for example :
RDD1: [("one","one",23)],[("one","one",23)],[("two","two",28)],[("one","one",23)]
RDD2: [("one","one",23)],[("two","two",28)]
expected
RDD3:[("one","one",23)],[("one","one",23)]
only the duplicates where count of the duplicates is reduced by 1
RD# is collection of only the duplicates for example if 10 transactions are there 1 is unique so i should collect the 9 transactions in the RDD3
Upvotes: 0
Views: 79
Reputation: 1
val rdd1 = sc.parallelize(List(("A", "ANT", 1), ("A", "ANT", 1), ("B", "BALL", 2), ("C", "CAT", 3), ("C", "CAT", 3))).zipWithIndex()
rdd1.collect().foreach(r => print(r))
val rdd2 = rdd1.reduceByKey((a, b) => if (a != b) a else b)
rdd2.collect().foreach(r => print(r))
val rdd3 = rdd1.subtract(rdd2).map(p => p._1)
rdd3.collect().foreach(r => print(r))
Result:
RDD1: ((A,ANT,1),0)((A,ANT,1),1)((B,BALL,2),2)((C,CAT,3),3)((C,CAT,3),4)
RDD2: ((A,ANT,1),0)((C,CAT,3),3)((B,BALL,2),2)
RDD3: (C,CAT,3)(A,ANT,1)
Upvotes: 0