Reputation: 842
i'm new to spark and don't understand how mapreduce mechanism works with spark. I have one csv file with only doubles, what i want is to make an operation (compute euclidian distance) with the first vector with the rest of the rdd. Then iterate with the other vectors. It is exist a other way than this one ? Maybe use wisely the cartesian product...
val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)),(2,Vectors.dense(3,4),...)))
val array_vects = rdd.collect
val size = rdd.count
val emptyArray = Array((0,Vectors.dense(0))).tail
var rdd_rez = sc.parallelize(emptyArray)
for( ind <- 0 to size -1 ) {
val vector = array_vects(ind)._2
val rest = rdd.filter(x => x._1 != ind)
val rdd_dist = rest.map( x => (x._1 , Vectors.sqdist(x._2,vector)))
rdd_rez = rdd_rez ++ rdd_dist
}
Thank you for your support.
Upvotes: 2
Views: 3586
Reputation: 8851
The distances (between all pairs of vectors) can be calculated using rdd.cartesian
:
val rdd = sc.parallelize(Array((1,Vectors.dense(1,2)),
(2,Vectors.dense(3,4)),...))
val product = rdd.cartesian(rdd)
val result = product.filter{ case ((a, b), (c, d)) => a != c }
.map { case ((a, b), (c, d)) =>
(a, Vectors.sqdist(b, d)) }
Upvotes: 4
Reputation: 13985
I don't think why you were trying to do something like that. you can simply do this as follows.
val initialArray = Array( ( 1,Vectors.dense(1,2) ), ( 2,Vectors.dense(3,4) ),... )
val firstVector = initialArray( 0 )
val initialRdd = sc.parallelize( initialArray )
val euclideanRdd = initialRdd.map( { case ( i, vec ) => ( i, euclidean( firstVector, vec ) ) } )
Where we define a function euclidean
which take two dense vectors and returns euclidean distances.
Upvotes: 0