Reputation: 33
I have cogrouped two RDDs' and I have to compare Iterable strings in the compact Buffer of the result RDD, Comparison is in this way
1 ) If right compactbuffer value is empty , keep the left compactbuffer value.
2) If the left compactbuffer value buffer is empty, keep the right buffer only when there is 'I' in it.
3) If the left compactbuffer value is not empty and the right compactbuffer value is also not empty with "I" in it in that case take the right buffer.
4) If the left compactbuffer value is not empty and the right compactbuffer value is also not empty keep and the right buffer has no "I" in it,then discard both the values and entire entry should be gone from the rdd.
Input:
res4: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[25] at cogroup at <console>:57
Array[(String, (Iterable[String], Iterable[String]))] =
Array((6,(CompactBuffer(6,surya,1003,null),CompactBuffer(6,surya,1030,D))),
(5,(CompactBuffer(5,karun,1007,null),CompactBuffer(5,nirav,1023,I))),
(9,(CompactBuffer(9,pranav,1010,null),CompactBuffer())),
(3,(CompactBuffer(3,riahana,1006,null),CompactBuffer(3,rohit,1020,I))),
(1,(CompactBuffer(1,shubham,1001,null),CompactBuffer(1,yuvraj,1070,I))))
I need this output please help:
Output :
org.apache.spark.rdd.RDD[String]
5,nirav,1023,I
9,pranav,1010,null
3,rohit,1020,I
1,yuvraj,1070,I
Upvotes: 0
Views: 524
Reputation: 30320
Not sure I follow, but this will hopefully give you an idea:
val result: RDD[(String, Iterable[String])] = data
.filter(_._2._2.exists(_.endsWith("I")))
.mapValues {
case (left, right) => (left, right.filter(_.endsWith("I")))
}.mapValues {
case (left, right) => right.headOption.map(_ => right).getOrElse(left)
}
Or you could do a simple right.isEmpty
check with an if
/else
if you prefer.
Upvotes: 1