fluency03
fluency03

Reputation: 2697

create a new accumulator and vector addition

I am following the Spark Programming Guide but encountering some problems with accumulator and Vector

I have the following object defining the VectorAccumulatorParam:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
    def zeros(initialValue: Vector): Vector = {
        Vectors.zeros( initialValue.size )
    }
    def addInPlace(v1: Vector, v2: Vector): Vector = {
        v1 += v2
    }
}

It is used like:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.AccumulatorParam
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object SimpleSpark {
    def main(arg: Array[String]) = {
        /* some coed */

        val valAccum = sc.accumulator( Vector(1, 2, 3) )(VectorAccumulatorParam) 

        /* some coed */
}

But I got two errors:

[error] /home/cliu/Documents/github/Apache-Spark/src/main/scala/SimpleSpark.scala:125: type mismatch;
[error]  found   : VectorAccumulatorParam.type
[error]  required: org.apache.spark.AccumulatorParam[scala.collection.immutable.Vector[Int]]
[error]         val valAccum = sc.accumulator( Vector(1, 2, 3) )(VectorAccumulatorParam) 
[error]                                                          ^
[error] /home/cliu/Documents/github/Apache-Spark/src/main/scala/SimpleSpark.scala:170: value += is not a member of org.apache.spark.mllib.linalg.Vector
[error]         v1 += v2
[error]            ^

First: Why there is a type mismatch? It is what is written in the programming guide. And it should return a Vector.

Second: Why the guide is like this if the operation v1 += v2 is not working?

Upvotes: 1

Views: 747

Answers (2)

Alberto Bonsanto
Alberto Bonsanto

Reputation: 18022

The second one first, because it is easier. If you read the documentation of Vector you will realize that they don't have the += method, therefore the concatenation you want to achieve can't be done this way, nevertheless some other classes support that method, take a look on Accumulator, so my suggestion is to write your own method to achieve this.

Upvotes: 1

zero323
zero323

Reputation: 330063

  1. You get a type error because Vector(1, 2, 3) creates scala.collection.immutable.Vector, which is immutable equivalent of the Array, not mllib.linalg.Vector. mllib.linalg.Vectors don't provide companion objects

  2. As already stated by Alberto Bonsanto mllib.linalg.Vectors don't support addition operation.

  3. There is also a typo in the param definition. The correct name of the method is zero not zeros.

  4. Regarding the programming guide I guess you've missed following statement:

    supposing we had a Vector class representing mathematical vectors, we could write

You could create an accumulator using breeze.linalg.Vector:

import breeze.linalg.{DenseVector => BDV}

object VectorAccumulatorParam extends AccumulatorParam[BDV[Double]] {
  def zero(v: BDV[Double]): BDV[Double] = v
  def addInPlace(v1: BDV[Double], v2: BDV[Double]): BDV[Double] = v1 += v2
}

val valAccum = sc.accumulator(BDV.zeros[Double](3))(VectorAccumulatorParam) 

See also: Difference between spark Vectors and scala immutable Vector?

Upvotes: 3

Related Questions