himanshuIIITian
himanshuIIITian

Reputation: 6085

How to zip an RDD with its corresponding elements in another RDD?

I have 2 RDD(s). Like this:

RDD1

scala> val rdd1 = spark.sparkContext.parallelize(List(1,1,2,3,4,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at parallelize at <console>:23

It contains repeated values.

RDD2

scala> val rdd2 = spark.sparkContext.parallelize(List(1,2,3,4))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[102] at parallelize at <console>:23

It contains all unique values that are present in RDD1.

Now, I am applying zip over RDD1 and RDD2 like this:

scala> rdd1.distinct.coalesce(rdd2.getNumPartitions).zip(rdd2).collect
res22: Array[(Int, Int)] = Array((4,1), (1,2), (2,3), (3,4))

Here it is zipping 4 with 1, 2 with 3 and so on. Whereas I want result in following format:

Array((1,1), (2,2), (3,3), (4,4))

How should I apply zip operation on them so that I can achieve expected output ?

Upvotes: 1

Views: 834

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Your first rdd is shuffled when you distinct so the values got out of order.

What you can do is create a pair rdd of your rdd1 and do the sorting and carry on the rest

val rdd1 = sc.parallelize(List(1,1,2,3,4,4)).map(x => ("a", x)).distinct.sortBy(_._2).values
val rdd2 = sc.parallelize(List(1,2,3,4))
rdd1.coalesce(rdd2.getNumPartitions).zip(rdd2)

Upvotes: 1

Related Questions