Reputation: 3587
I am joining a large number of rdd's and I was wondering whether there is a generic way of removing the parenthesis that are being created on each join.
Here is a small sample:
val rdd1 = sc.parallelize(Array((1,2),(2,4),(3,6)))
val rdd2 = sc.parallelize(Array((1,7),(2,8),(3,6)))
val rdd3 = sc.parallelize(Array((1,2),(2,4),(3,6)))
val result = rdd1.join(rdd2).join(rdd3)
res: result: org.apache.spark.rdd.RDD[(Int, ((Int, Int), Int))] = Array((1,((2,7),2)), (3,((4,8),4)), (3,((4,8),6)), (3,((4,6),4)), (3,((4,6),6)))
I know I can use map
result.map((x) => (x._1,(x._2._1._1,x._2._1._2,x._2._2))).collect
Array[(Int, (Int, Int, Int))] = Array((1,(2,7,2)), (2,(4,8,4)), (3,(6,6,6)))
but with a large number of rdd's each containing many elements it quite quickly becomes difficult to use this method
Upvotes: 1
Views: 152
Reputation: 330413
With a large number of rdd's each containing many elements this approach simply won't work because the largest built-in tuple is still Tuple22
. If you join homogeneous RDD some type of sequence:
def joinAndMerge(rdd1: RDD[(Int, Seq[Int])], rdd2: RDD[(Int, Seq[Int])]) =
rdd1.join(rdd2).mapValues{ case (x, y) => x ++ y }
Seq(rdd1, rdd2, rdd3).map(_.mapValues(Seq(_))).reduce(joinAndMerge)
If you have only three RDDs it can be cleaner to use cogroup
:
rdd1.cogroup(rdd2, rdd3)
.flatMapValues { case (xs, ys, zs) => for {
x <- xs; y <- ys; z <- zs
} yield (x, y, z) }
If values are heterogenous it makes more sense to use DataFrames
:
def joinByKey(df1: DataFrame, df2: DataFrame) = df1.join(df2, Seq("k"))
Seq(rdd1, rdd2, rdd3).map(_.toDF("k", "v")).reduce(joinByKey)
Upvotes: 1