mlee_jordan
mlee_jordan

Reputation: 842

How to process cogroup values?

I am cogrouping two RDDs and I want to process its values. That is,

rdd1.cogroup(rdd2)

as a result of this cogrouping I get results as below:

(ion,(CompactBuffer(100772C121, 100772C111, 6666666666),CompactBuffer(100772C121)))

Considering this result I would like to obtain all distinct pairs. e.g.

For the key 'ion'

100772C121 - 100772C111
100772C121 - 666666666
100772C111 - 666666666

How can I do this in scala?

Upvotes: 1

Views: 2232

Answers (1)

Rohan Aletty
Rohan Aletty

Reputation: 2442

You could try something like the following:

(l1 ++ l2).distinct.combinations(2).map { case Seq(x, y) => (x, y) }.toList

You would need to update l1 and l2 for your CompactBuffer fields. When I tried this locally, I get this (which is what I believe you want):

scala> val l1 = List("100772C121", "100772C111", "6666666666")
l1: List[String] = List(100772C121, 100772C111, 6666666666)

scala> val l2 = List("100772C121")
l2: List[String] = List(100772C121)

scala> val combine = (l1 ++ l2).distinct.combinations(2).map { case Seq(x, y) => (x, y) }.toList
combine: List[(String, String)] = List((100772C121,100772C111), (100772C121,6666666666), (100772C111,6666666666))

If you would like all of these pairs on separate rows, you can enclose this logic within a flatMap.

EDIT: Added steps per your example above.

scala> val rdd1 = sc.parallelize(Array(("ion", "100772C121"), ("ion", "100772C111"), ("ion", "6666666666")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> val rdd2 = sc.parallelize(Array(("ion", "100772C121")))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[1] at parallelize at <console>:12

scala>   val cgrp = rdd1.cogroup(rdd2).flatMap {
 |         case (key: String, (l1: Iterable[String], l2: Iterable[String])) =>
 |         (l1.toSeq ++ l2.toSeq).distinct.combinations(2).map { case Seq(x, y) => (x, y) }.toList
 |       }
 cgrp: org.apache.spark.rdd.RDD[(String, String)] = FlatMappedRDD[4] at flatMap at <console>:16

 scala> cgrp.foreach(println)

 ...

 (100772C121,100772C111)
 (100772C121,6666666666)
 (100772C111,6666666666)

EDIT 2: Updated again per your use case.

 scala> val cgrp = rdd1.cogroup(rdd2).flatMap {
 |      case (key: String, (l1: Iterable[String], l2: Iterable[String])) =>
 |           for { e1 <- l1.toSeq; e2 <- l2.toSeq; if (e1 != e2) } 
 |                yield if (e1 > e2) ((e1, e2), 1) else ((e2, e1), 1)
 |      }.reduceByKey(_ + _)

 ... 

 ((6666666666,100772C121),2)
 ((6666666666,100772C111),1)
 ((100772C121,100772C111),1)

Upvotes: 2

Related Questions