eli
eli

Reputation: 81

Spark Multiple Joins

Using spark context, I would like to perform multiple joins between rdd's, where the number of rdd's to be joined should be dynamic. I would like the result to be unfolded, for example:

val rdd1  = sc.parallelize(List((1,1.0),(11,11.0), (111,111.0)))
val rdd2  = sc.parallelize(List((1,2.0),(11,12.0), (111,112.0)))
val rdd3  = sc.parallelize(List((1,3.0),(11,13.0), (111,113.0)))
val rdd4  = sc.parallelize(List((1,4.0),(11,14.0), (111,114.0)))
val rdd11 = rdd1.join(rdd2).join(rdd3).join(rdd4)
.foreach(println)

generates the following output:

(11,(((11.0,12.0),13.0),14.0))

(111,(((111.0,112.0),113.0),114.0))

(1,(((1.0,2.0),3.0),4.0))

I would like to:

  1. Unfold the values, e.g first line should read: (11, 11.0, 12.0, 13.0, 14.0).

  2. Do it dynamically so that it can work on any dynamic number of rdd's to be joined.

Any ideas would be appreciated,

Eli.

Upvotes: 2

Views: 1847

Answers (2)

wkf
wkf

Reputation: 1

Others with this problem may find groupWith helpful. From the docs:

>>> w = sc.parallelize([("a", 5), ("b", 6)])
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> z = sc.parallelize([("b", 42)])
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

Upvotes: 0

Wesley Miao
Wesley Miao

Reputation: 861

Instead of using join, I would use union followed by groupByKey to achieve what you desire.

Here is what I would do -

val emptyRdd = sc.emptyRDD[(Int, Double)]
val listRdds = List(rdd1, rdd2, rdd3, rdd4) // satisfy your dynamic number of rdds
val unioned = listRdds.fold(emptyRdd)(_.union(_))
val grouped = unioned.groupByKey
grouped.collect().foreach(println(_))

This will yields the result:

(1,CompactBuffer(1.0, 2.0, 3.0, 4.0))
(11,CompactBuffer(11.0, 12.0, 13.0, 14.0))
(111,CompactBuffer(111.0, 112.0, 113.0, 114.0))

Updated:

If you would still like to use join, this is how to do it with somewhat complicated foldLeft functions -

val joined = rddList match {
  case head::tail => tail.foldLeft(head.mapValues(Array(_)))(_.join(_).mapValues {
    case (arr: Array[Double], d: Double) => arr :+ d
  })
  case Nil => sc.emptyRDD[(Int, Array[Double])]
}

And joined.collect will yield

res14: Array[(Int, Array[Double])] = Array((1,Array(1.0, 2.0, 3.0, 4.0)), (11,Array(11.0, 12.0, 13.0, 14.0)), (111,Array(111.0, 112.0, 113.0, 114.0)))

Upvotes: 2

Related Questions