Mnemosyne
Mnemosyne

Reputation: 1192

Can't Zip RDDs with unequal number of partitions. What can I use as an alternative to zip?

I have three RDDs of the same size rdd1contains a String identifier, rdd2 contains a vector and rdd3contains an integer value.

Essentially I want to zip those three together to get an RDD of RDD[String,Vector,Int] but I continuously get can't zip RDDs with unequal number of partitions. How can I completely bypass zip to do the abovementioned thing?

Upvotes: 2

Views: 1839

Answers (3)

user6022341
user6022341

Reputation:

Try:

rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values

Upvotes: 7

Tim
Tim

Reputation: 3725

Do they all have the same number of elements? zipPartitions is used to join RDDs in the special case that they have exactly the same number of partitions and exactly the same number of elements in each partition.

Your case has no such guarantees. What do you want to do in the case that rdd3 is actually empty? Should you get a resulting RDD with no elements?

Edit: If you know that the length's are exactly the same, LostInOverflow's answer will work.

Upvotes: 1

puhlen
puhlen

Reputation: 8529

Before splitting up your origional RDD, assign each row a unique id with RDD.zipWithUniqueId. Then make sure to include the id field in each of the RDDs you spit from the original and use them as the key for those rows (use keyBy if the id is not already the key) then use RDD.join to recomine the rows.

An example might look like:

val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }

/*transformations on rdd1 and 2*/

val recombined = rdd1.join(rdd2)

Upvotes: 1

Related Questions