Reputation: 81
Using spark context, I would like to perform multiple joins between rdd's, where the number of rdd's to be joined should be dynamic. I would like the result to be unfolded, for example:
val rdd1 = sc.parallelize(List((1,1.0),(11,11.0), (111,111.0)))
val rdd2 = sc.parallelize(List((1,2.0),(11,12.0), (111,112.0)))
val rdd3 = sc.parallelize(List((1,3.0),(11,13.0), (111,113.0)))
val rdd4 = sc.parallelize(List((1,4.0),(11,14.0), (111,114.0)))
val rdd11 = rdd1.join(rdd2).join(rdd3).join(rdd4)
.foreach(println)
generates the following output:
(11,(((11.0,12.0),13.0),14.0))
(111,(((111.0,112.0),113.0),114.0))
(1,(((1.0,2.0),3.0),4.0))
I would like to:
Unfold the values, e.g first line should read: (11, 11.0, 12.0, 13.0, 14.0).
Do it dynamically so that it can work on any dynamic number of rdd's to be joined.
Any ideas would be appreciated,
Eli.
Upvotes: 2
Views: 1847
Reputation: 1
Others with this problem may find groupWith helpful. From the docs:
>>> w = sc.parallelize([("a", 5), ("b", 6)])
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> z = sc.parallelize([("b", 42)])
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
Upvotes: 0
Reputation: 861
Instead of using join
, I would use union
followed by groupByKey
to achieve what you desire.
Here is what I would do -
val emptyRdd = sc.emptyRDD[(Int, Double)]
val listRdds = List(rdd1, rdd2, rdd3, rdd4) // satisfy your dynamic number of rdds
val unioned = listRdds.fold(emptyRdd)(_.union(_))
val grouped = unioned.groupByKey
grouped.collect().foreach(println(_))
This will yields the result:
(1,CompactBuffer(1.0, 2.0, 3.0, 4.0))
(11,CompactBuffer(11.0, 12.0, 13.0, 14.0))
(111,CompactBuffer(111.0, 112.0, 113.0, 114.0))
Updated:
If you would still like to use join
, this is how to do it with somewhat complicated foldLeft functions -
val joined = rddList match {
case head::tail => tail.foldLeft(head.mapValues(Array(_)))(_.join(_).mapValues {
case (arr: Array[Double], d: Double) => arr :+ d
})
case Nil => sc.emptyRDD[(Int, Array[Double])]
}
And joined.collect
will yield
res14: Array[(Int, Array[Double])] = Array((1,Array(1.0, 2.0, 3.0, 4.0)), (11,Array(11.0, 12.0, 13.0, 14.0)), (111,Array(111.0, 112.0, 113.0, 114.0)))
Upvotes: 2