Reputation: 1192
I have three RDDs of the same size rdd1
contains a String identifier, rdd2
contains a vector and rdd3
contains an integer value.
Essentially I want to zip those three together to get an RDD of RDD[String,Vector,Int]
but I continuously get can't zip RDDs with unequal number of partitions. How can I completely bypass zip to do the abovementioned thing?
Upvotes: 2
Views: 1839
Reputation:
Try:
rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values
Upvotes: 7
Reputation: 3725
Do they all have the same number of elements? zipPartitions
is used to join RDDs in the special case that they have exactly the same number of partitions and exactly the same number of elements in each partition.
Your case has no such guarantees. What do you want to do in the case that rdd3
is actually empty? Should you get a resulting RDD with no elements?
Edit: If you know that the length's are exactly the same, LostInOverflow's answer will work.
Upvotes: 1
Reputation: 8529
Before splitting up your origional RDD, assign each row a unique id with RDD.zipWithUniqueId
. Then make sure to include the id field in each of the RDDs you spit from the original and use them as the key for those rows (use keyBy
if the id is not already the key) then use RDD.join
to recomine the rows.
An example might look like:
val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }
/*transformations on rdd1 and 2*/
val recombined = rdd1.join(rdd2)
Upvotes: 1