Omegaspard
Omegaspard

Reputation: 1960

How to sum corresponding elements of two RDD[Int]s?

I want to make a very simple operation. I need to realise an addition between 2 vectors, for {1,2,3,4,5}, {6,7,8,9,10} I want to obtain the vector {7,9,11,13,15}. The 2 vectors are represented in my code by two RDD[Int].

val v1 = sc.parallelize(List(1,2,3,4,5))
val v2 = sc.parallelize(List(6,7,8,9,10))

I know that I can't browse a RDD when I'm mapping another one and because of this I have no idea of how to realise such a simple operation. How could I proceed ?

Upvotes: 0

Views: 951

Answers (2)

Jean Logeart
Jean Logeart

Reputation: 53829

Assuming your 2 RDDs have the same size, you can do:

val res: RDD[Int] = v1.zip(v2).map { case (a, b) => a + b }

Upvotes: 1

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

TL;DR Use zip operator.

Quoting RDD.zip:

zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)] Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc.

In your case, it'd be as follows:

val zipped = v1 zip v2 // <-- you may want to use v1.zip(v2) instead to keep types
zipped.map { case (x, y) => x + y }    
scala> zipped.map { case (x, y) => x + y }.foreach(println)
11
13
9
15
7

As a bonus, please consider Spark SQL where such simple operations (like foreach(println)) would look as follows:

val sums = zipped.map { case (x, y) => x + y }.toDF("sum")
scala> sums.show
+---+
|sum|
+---+
|  7|
|  9|
| 11|
| 13|
| 15|
+---+

And you've got sorting "for free" (!)

Upvotes: 2

Related Questions