Reputation: 611
I've tried to solve applied cogroup
problem. but I really don't know that...
If there are two RDDs with different keys as in the example below, Is it possible to extract valid data1
only when the first word is the same using cogroup
?
val data1 = sc.parallelize(Seq(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
(ba,(CompactBuffer(2),CompactBuffer())),
(bc,(CompactBuffer(2),CompactBuffer())),
(a,(CompactBuffer(),CompactBuffer(3))),
(b,(CompactBuffer(3),CompactBuffer(5))),
(c,(CompactBuffer(1),CompactBuffer())),
(aa,(CompactBuffer(1),CompactBuffer()))
) */
The result should be Array(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3))
I solved this problem by using broadcast()
as @mrsrinivas said.
But broadcast()
is not appropriate for large data.
val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(myFuncOper(r._1)))
Is there a method to solve this problem using cogroup
with the functional operation?
Upvotes: 1
Views: 533
Reputation: 35434
Short:
val result = data1
.flatMap(x => x._1.split("").map(y => (y, x)))
.join(data2)
.map(x => x._2._1)
.distinct
Detailed:
flatMap(x => x._1.split("").map(y => (y, x)))
holds
List(
(a, (aa, 1)),
(a, (aa, 1)),
(b, (ba, 2)),
(a, (ba, 2)),
(b, (bc, 2)),
(c, (bc, 2)),
(b, (b, 3)),
(c, (c, 1))
)
after join(data2)
List(
(a, ((aa, 1), 3)),
(a, ((aa, 1), 3)),
(a, ((ba, 2), 3)),
(b, ((ba, 2), 5)),
(b, ((bc, 2), 5)),
(b, ((b, 3), 5))
)
Now all we interested in distinct 2nd first pairs, which can be done by map(x => x._2._1).distinct
Upvotes: 1
Reputation: 37842
You can use cogroup
after extracting a key that would match data2
's keys, and then use filter
and map
to remove values without matches and "restructure" the data:
val result: RDD[(String, Int)] = data1
.keyBy(_._1.substring(0, 1)) // key by first character
.cogroup(data2)
.filter { case (_, (_, data2Values)) => data2Values.nonEmpty }
.flatMap { case (_, (data1Values, _)) => data1Values }
Upvotes: 1