KyBe
KyBe

Reputation: 842

How to avoid for loop with Spark?

i'm new to spark and don't understand how mapreduce mechanism works with spark. I have one csv file with only doubles, what i want is to make an operation (compute euclidian distance) with the first vector with the rest of the rdd. Then iterate with the other vectors. It is exist a other way than this one ? Maybe use wisely the cartesian product...

val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)),(2,Vectors.dense(3,4),...)))
val array_vects = rdd.collect
val size = rdd.count
val emptyArray = Array((0,Vectors.dense(0))).tail
var rdd_rez = sc.parallelize(emptyArray)

for( ind <- 0 to size -1 ) {
   val vector = array_vects(ind)._2
   val rest = rdd.filter(x => x._1 != ind)
   val rdd_dist = rest.map( x => (x._1 , Vectors.sqdist(x._2,vector)))
   rdd_rez = rdd_rez ++ rdd_dist
}

Thank you for your support.

Upvotes: 2

Views: 3586

Answers (2)

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

The distances (between all pairs of vectors) can be calculated using rdd.cartesian:

val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)),
                               (2,Vectors.dense(3,4)),...))
val product = rdd.cartesian(rdd)

val result = product.filter{ case ((a, b), (c, d)) => a != c }
                    .map   { case ((a, b), (c, d)) => 
                                   (a, Vectors.sqdist(b, d)) }

Upvotes: 4

sarveshseri
sarveshseri

Reputation: 13985

I don't think why you were trying to do something like that. you can simply do this as follows.

val initialArray = Array( ( 1,Vectors.dense(1,2) ), ( 2,Vectors.dense(3,4) ),... )

val firstVector = initialArray( 0 )

val initialRdd = sc.parallelize( initialArray )

val euclideanRdd = initialRdd.map( { case ( i, vec ) => ( i, euclidean( firstVector, vec ) ) } )

Where we define a function euclidean which take two dense vectors and returns euclidean distances.

Upvotes: 0

Related Questions