祝方泽
祝方泽

Reputation: 135

how to concat two rdd in spark

rdd_1 = [(k1, (v1, v2)), (k2, (v3, v4, v5))]
rdd_2 = [(v1, (w1)), (v3, (w2, w3)), (v5, (w4))]

I want to get a new rdd like this rdd = [(k1, (w1)), (k2, (w2, w3, w4))]

How to do this in spark with python?

Upvotes: 1

Views: 1642

Answers (2)

mrnakumar
mrnakumar

Reputation: 657

Here is the full working code in Scala. This code is basically using three transformations flatMap, join and groupBy. Problem here is that the join and groupBy key has to be different. So first we use flatMap on rdd_1 to get a rdd of (v,k) type. Now we have rdds of type (v,k) and (v,List(w)), so we do join based on v. Type of our joined rdd will be (v,(k,List(w))). Finally we groupBy K by passing a function x => x._2._1 to the groupBy transformation. Below is the entire code:-->

val rdd1 = sc.parallelize(Seq(("k1", List("v1", "v2")), ("k2", List("v3", "v4", "v5"))))
val rdd2 = sc.parallelize(Seq(("v1", List("w1")), ("v3", List("w2", "w3")), ("v5", List("w4"))))
val flattenedRdd1 = rdd1 flatMap {
  case (x, y) => {
    val lb = new ListBuffer[(String, String)]
    y.foreach { v => lb += ((v, x)) }
    lb
  }
}
val joined = flattenedRdd1 join rdd2
val result = joined.groupBy { x => x._2._1 }.map {
  case (x, itr) => {
    val lb = new ListBuffer[String]
    itr.foreach {
      case (f, s) => {
        lb.++=(s._2)
      }
    }
    (x, lb)
  }
}
result.foreach(println(_)).

Upvotes: 0

Mariusz
Mariusz

Reputation: 13926

flatMap, join and groupByKey should do the job (used in this order).

Upvotes: 2

Related Questions