Reputation: 135
rdd_1 = [(k1, (v1, v2)), (k2, (v3, v4, v5))]
rdd_2 = [(v1, (w1)), (v3, (w2, w3)), (v5, (w4))]
I want to get a new rdd like this rdd = [(k1, (w1)), (k2, (w2, w3, w4))]
How to do this in spark with python?
Upvotes: 1
Views: 1642
Reputation: 657
Here is the full working code in Scala. This code is basically using three transformations flatMap, join and groupBy. Problem here is that the join and groupBy key has to be different. So first we use flatMap on rdd_1 to get a rdd of (v,k) type. Now we have rdds of type (v,k) and (v,List(w)), so we do join based on v. Type of our joined rdd will be (v,(k,List(w))). Finally we groupBy K by passing a function x => x._2._1 to the groupBy transformation. Below is the entire code:-->
val rdd1 = sc.parallelize(Seq(("k1", List("v1", "v2")), ("k2", List("v3", "v4", "v5"))))
val rdd2 = sc.parallelize(Seq(("v1", List("w1")), ("v3", List("w2", "w3")), ("v5", List("w4"))))
val flattenedRdd1 = rdd1 flatMap {
case (x, y) => {
val lb = new ListBuffer[(String, String)]
y.foreach { v => lb += ((v, x)) }
lb
}
}
val joined = flattenedRdd1 join rdd2
val result = joined.groupBy { x => x._2._1 }.map {
case (x, itr) => {
val lb = new ListBuffer[String]
itr.foreach {
case (f, s) => {
lb.++=(s._2)
}
}
(x, lb)
}
}
result.foreach(println(_)).
Upvotes: 0
Reputation: 13926
flatMap
, join
and groupByKey
should do the job (used in this order).
Upvotes: 2