luis
luis

Reputation: 51

Join multiple rdds

I have 4rdds of type RDD:((int,int,int),values) and my rdds are

rdd1: ((a,b,c), value) 
rdd2:((a,d,e),valueA) 
rdd3:((f,b,g),valueB)
rdd4:((h,i,c),valueC) 

How can i join the rdds like rdd1 join rdd2 on "a" rdd1 join rdd2 on "b" and rdd1 join rdd3 on "c"

so the output is finalRdd: ((a,b,c),valueA,valueB,valueC,value)) in Scala ?

I tried doing this with collectAsMap but it didnt work well and throws exception

code just for rdd1 join rdd2

val newrdd2=rdd2.map{case( (a,b,c),d)=>(a,d)}.collectAsMap
val joined=rdd1.map{case( (a,b,c),d)=>(newrdd2.get(a).get,b,c,d)} 

example

rdd1: ((1,2,3),animals)
rdd2:((1,anyInt,anyInt),cat)
rdd3:((anyInt,2,anyInt),cow )
rdd 4: ((anyInt,anyInt,3),parrot)

the output should be ((1,2,3),animals,cat,cow,parrot )

Upvotes: 4

Views: 5210

Answers (1)

Daniel Langdon
Daniel Langdon

Reputation: 5999

There is a handy join method on RDDs, but you need it to be keyed by you particular join key, which is what Spark uses for partitioning and shuffling.

From the docs:

join(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

I can't compile where I am, but by hand it goes something like this:

val rdd1KeyA = rdd1.map(x => (x._1._1, (x._1._2, x._1._3. x._2) // RDD(a, (b,c,value))
val rdd2KeyA = rdd2.map(x => (x._1._1, x._2) // RDD(a, valueA)
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a, ((b,c,value), valueA))

val rdd3KeyB = rdd3.map(x => (x._1._2, x._2) // RDD(b, valueB)
val joined1KeyB = joined1.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._1._3. x._2._2) // RDD(b, (a, c, value, valueA))
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b, ((a, c, value, valueA), valueB))

...and so on

Avoid collect* functions since they do not use the distributed nature of your data and are prone to fail on big loads, they shuffle all data on an RDD to an in-memory collection on the master node, probably blowing everything up.

Upvotes: 3

Related Questions