Reputation: 6085
I have 2 RDD(s). Like this:
RDD1
scala> val rdd1 = spark.sparkContext.parallelize(List(1,1,2,3,4,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at parallelize at <console>:23
It contains repeated values.
RDD2
scala> val rdd2 = spark.sparkContext.parallelize(List(1,2,3,4))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[102] at parallelize at <console>:23
It contains all unique values that are present in RDD1.
Now, I am applying zip over RDD1 and RDD2 like this:
scala> rdd1.distinct.coalesce(rdd2.getNumPartitions).zip(rdd2).collect
res22: Array[(Int, Int)] = Array((4,1), (1,2), (2,3), (3,4))
Here it is zipping 4
with 1
, 2
with 3
and so on. Whereas I want result in following format:
Array((1,1), (2,2), (3,3), (4,4))
How should I apply zip operation on them so that I can achieve expected output ?
Upvotes: 1
Views: 834
Reputation: 41957
Your first rdd
is shuffled
when you distinct
so the values
got out of order.
What you can do is create a pair rdd
of your rdd1
and do the sorting
and carry on the rest
val rdd1 = sc.parallelize(List(1,1,2,3,4,4)).map(x => ("a", x)).distinct.sortBy(_._2).values
val rdd2 = sc.parallelize(List(1,2,3,4))
rdd1.coalesce(rdd2.getNumPartitions).zip(rdd2)
Upvotes: 1