S.Kang
S.Kang

Reputation: 611

how to use applied cogroup method with functional operation in spark?

I've tried to solve applied cogroup problem. but I really don't know that...

If there are two RDDs with different keys as in the example below, Is it possible to extract valid data1 only when the first word is the same using cogroup?

val data1 = sc.parallelize(Seq(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))

val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
    (ba,(CompactBuffer(2),CompactBuffer())), 
    (bc,(CompactBuffer(2),CompactBuffer())), 
    (a,(CompactBuffer(),CompactBuffer(3))), 
    (b,(CompactBuffer(3),CompactBuffer(5))), 
    (c,(CompactBuffer(1),CompactBuffer())), 
    (aa,(CompactBuffer(1),CompactBuffer()))
) */

The result should be Array(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3))

I solved this problem by using broadcast() as @mrsrinivas said. But broadcast() is not appropriate for large data.

val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(myFuncOper(r._1)))

Is there a method to solve this problem using cogroup with the functional operation?

Upvotes: 1

Views: 533

Answers (2)

mrsrinivas
mrsrinivas

Reputation: 35434

Short:

val result = data1
  .flatMap(x => x._1.split("").map(y => (y, x)))
  .join(data2)
  .map(x => x._2._1)
  .distinct

Detailed:

flatMap(x => x._1.split("").map(y => (y, x))) holds

 List(
  (a, (aa, 1)),
  (a, (aa, 1)),
  (b, (ba, 2)),
  (a, (ba, 2)),
  (b, (bc, 2)),
  (c, (bc, 2)),
  (b, (b, 3)),
  (c, (c, 1))
)

after join(data2)

List(
  (a, ((aa, 1), 3)),
  (a, ((aa, 1), 3)),
  (a, ((ba, 2), 3)),
  (b, ((ba, 2), 5)),
  (b, ((bc, 2), 5)),
  (b, ((b, 3), 5))
)

Now all we interested in distinct 2nd first pairs, which can be done by map(x => x._2._1).distinct

Upvotes: 1

Tzach Zohar
Tzach Zohar

Reputation: 37842

You can use cogroup after extracting a key that would match data2's keys, and then use filter and map to remove values without matches and "restructure" the data:

val result: RDD[(String, Int)] = data1
  .keyBy(_._1.substring(0, 1)) // key by first character
  .cogroup(data2)
  .filter { case (_, (_, data2Values)) => data2Values.nonEmpty }
  .flatMap { case (_, (data1Values, _)) => data1Values }

Upvotes: 1

Related Questions