nagSumanth
nagSumanth

Reputation: 91

spark accumalator value is different when inside RDD and outside RDD

my accumulator is an Array[Array[Int]] after updating accumalutor in foreach operation of RDD,accumulator(0) is as expected where as accumulator(1) is Array(0,0,0) which is completely lost

inside RDD ,accumulator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumulator value is Array(Array(4,5,6),Array(0,0,0))

below is the code

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     val conf = new SparkConf().setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     println("accumulator value in rdd is"+accumulator.localValue)
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc

  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    while (j < columnLength) {
      var i =0
    while (i < rowLength) {
         val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
      }


}

results: inside RDD ,accumalator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumalator value is Array(Array(4,5,6),Array(0,0,0))

but what i'm expecting outside RDD is Array(Array(4,5,6),Array(4,5,6))

Upvotes: 2

Views: 2264

Answers (3)

Tim
Tim

Reputation: 659

I found this no difference with modify the var i=0 to i=0 and the final result is Array(Array(4,5,6),Array(4,5,6))

The output of the application is fetched by yarn logs -applicationId.

The code is :

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     //val conf = new SparkConf().setAppName("Simple Application")
  val conf = new SparkConf()
  conf.setSparkHome("/usr/lib/spark")
  conf.setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     val columnLength: Int = accumulator.localValue.length
     val rowLength: Int = accumulator.localValue(0).length
     var j: Int = 0
     var i: Int = 0
     println("accumulator")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.localValue(j)(i))
            i += 1
        }
        j+=1
     }
     println("accumulator value in rdd is"+accumulator.localValue)
     }
     val columnLength: Int = accumulator.value.length
     val rowLength: Int = accumulator.value(0).length
     var j: Int = 0
     var i: Int = 0
     println("total")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.value(j)(i))
            i += 1
        }
        j+=1
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc
  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    var i: Int = 0
    while (j < columnLength) {
    i =0
    while (i < rowLength) {
        println("m1("+j+")("+i+")="+ m1(j)(i) + " m2("+j+")("+i+")="+ m2(j)(i))
        val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
  }
}

And the result is:

accumulator
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6

total
4
5
6
4
5
6

And modify the code to this:

    //var i: Int = 0
    while (j < columnLength) {
    var i =0

And the result is:

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6
total
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

accumulator
4
5
6
4
5
6

The final result is the same.

But I have two questions:

  • I do not know why the two output order are not the same.
  • Why the addInplace function be called twice?
    • I think I know why this function be called twice, but I'm not sure
      • initialize: Array(Array(1,1,1),Array(1,1,1)
      • output from a task: Array(Array(1,2,3),Array(1,2,3)
      • output from the other task: Array(Array(4,5,6),Array(4,5,6)

@Vijay Innamuri

Upvotes: 0

tribbloid
tribbloid

Reputation: 3848

localValue is supposed to be different, according to the document:

  • This is NOT the global value of the accumulator. To get the global value after a
  • completed operation on the dataset, call value. *
  • The typical use of this method is to directly mutate the local value, eg., to add
  • an element to a Set. */

Upvotes: 0

Vijay Innamuri
Vijay Innamuri

Reputation: 4372

addAccumulator method is called whenever there is an update to the accumulator.variable

In the above code accumulator += (x(0),0,0) invokes the addAccumulator method.

once all the tasks are completed addInPlace method is called to aggregate the accumulated values from all the tasks.

In the above code initialValue Array(1, 1, 1)Array(1, 1, 1) and task Accumulator value Array(4, 5, 6) Array(4, 5, 6) invokes the addInPlace method.

In the above code variable i in addInPlace method has to be reset whenever it enters the loop while (j < columnLength) {

Following code works like a charm.

            while (j < columnLength) {
              i=0
                while (i < rowLength) {
                  println("m1(j)(i)"+ m1(j)(i))
                  println(" m2(j)(i))"+ m2(j)(i))
                    val a = Math.max(m1(j)(i), m2(j)(i))
                            updatedMatrix(j)(i) = a
                            i += 1
                } 
                j += 1
            }

Upvotes: 3

Related Questions