Reputation: 51
I have 4rdds of type RDD:((int,int,int),values) and my rdds are
rdd1: ((a,b,c), value)
rdd2:((a,d,e),valueA)
rdd3:((f,b,g),valueB)
rdd4:((h,i,c),valueC)
How can i join the rdds like rdd1 join rdd2 on "a" rdd1 join rdd2 on "b" and rdd1 join rdd3 on "c"
so the output is finalRdd: ((a,b,c),valueA,valueB,valueC,value))
in Scala ?
I tried doing this with collectAsMap but it didnt work well and throws exception
code just for rdd1 join rdd2
val newrdd2=rdd2.map{case( (a,b,c),d)=>(a,d)}.collectAsMap
val joined=rdd1.map{case( (a,b,c),d)=>(newrdd2.get(a).get,b,c,d)}
example
rdd1: ((1,2,3),animals)
rdd2:((1,anyInt,anyInt),cat)
rdd3:((anyInt,2,anyInt),cow )
rdd 4: ((anyInt,anyInt,3),parrot)
the output should be ((1,2,3),animals,cat,cow,parrot )
Upvotes: 4
Views: 5210
Reputation: 5999
There is a handy join
method on RDDs, but you need it to be keyed by you particular join key, which is what Spark uses for partitioning and shuffling.
From the docs:
join(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
I can't compile where I am, but by hand it goes something like this:
val rdd1KeyA = rdd1.map(x => (x._1._1, (x._1._2, x._1._3. x._2) // RDD(a, (b,c,value))
val rdd2KeyA = rdd2.map(x => (x._1._1, x._2) // RDD(a, valueA)
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a, ((b,c,value), valueA))
val rdd3KeyB = rdd3.map(x => (x._1._2, x._2) // RDD(b, valueB)
val joined1KeyB = joined1.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._1._3. x._2._2) // RDD(b, (a, c, value, valueA))
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b, ((a, c, value, valueA), valueB))
...and so on
Avoid collect*
functions since they do not use the distributed nature of your data and are prone to fail on big loads, they shuffle all data on an RDD to an in-memory collection on the master node, probably blowing everything up.
Upvotes: 3