Reputation: 91
my accumulator is an Array[Array[Int]] after updating accumalutor in foreach operation of RDD,accumulator(0) is as expected where as accumulator(1) is Array(0,0,0) which is completely lost
inside RDD ,accumulator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumulator value is Array(Array(4,5,6),Array(0,0,0))
below is the code
import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object acc {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val a =Array(Array(1,2,3),Array(4,5,6))
val rdd = sc.parallelize(a)
val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
rdd.foreach{x=>
accumulator += (x(0),0,0)
accumulator += (x(1),0,1)
accumulator += (x(2),0,2)
accumulator += (x(0),1,0)
accumulator += (x(1),1,1)
accumulator += (x(2),1,2)
println("accumulator value in rdd is"+accumulator.localValue)
}
println("accumulator value out of rdd is :" + accumulator.value )
}
}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] {
def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
initialValue
}
def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {
acc(value._2)(value._3) = value._1
acc
}
def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
val columnLength: Int = m1.length
val rowLength: Int = m1(0).length
var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)
var j: Int = 0
while (j < columnLength) {
var i =0
while (i < rowLength) {
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
updatedMatrix
}
}
results: inside RDD ,accumalator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumalator value is Array(Array(4,5,6),Array(0,0,0))
but what i'm expecting outside RDD is Array(Array(4,5,6),Array(4,5,6))
Upvotes: 2
Views: 2264
Reputation: 659
I found this no difference with modify the var i=0 to i=0 and the final result is Array(Array(4,5,6),Array(4,5,6))
The output of the application is fetched by yarn logs -applicationId.
The code is :
import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object acc {
def main(args: Array[String]) {
//val conf = new SparkConf().setAppName("Simple Application")
val conf = new SparkConf()
conf.setSparkHome("/usr/lib/spark")
conf.setAppName("Simple Application")
val sc = new SparkContext(conf)
val a =Array(Array(1,2,3),Array(4,5,6))
val rdd = sc.parallelize(a)
val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
rdd.foreach{x=>
accumulator += (x(0),0,0)
accumulator += (x(1),0,1)
accumulator += (x(2),0,2)
accumulator += (x(0),1,0)
accumulator += (x(1),1,1)
accumulator += (x(2),1,2)
val columnLength: Int = accumulator.localValue.length
val rowLength: Int = accumulator.localValue(0).length
var j: Int = 0
var i: Int = 0
println("accumulator")
while(j < columnLength){
i =0
while(i<rowLength){
println(accumulator.localValue(j)(i))
i += 1
}
j+=1
}
println("accumulator value in rdd is"+accumulator.localValue)
}
val columnLength: Int = accumulator.value.length
val rowLength: Int = accumulator.value(0).length
var j: Int = 0
var i: Int = 0
println("total")
while(j < columnLength){
i =0
while(i<rowLength){
println(accumulator.value(j)(i))
i += 1
}
j+=1
}
println("accumulator value out of rdd is :" + accumulator.value )
}
}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] {
def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
initialValue
}
def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {
acc(value._2)(value._3) = value._1
acc
}
def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
val columnLength: Int = m1.length
val rowLength: Int = m1(0).length
var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)
var j: Int = 0
var i: Int = 0
while (j < columnLength) {
i =0
while (i < rowLength) {
println("m1("+j+")("+i+")="+ m1(j)(i) + " m2("+j+")("+i+")="+ m2(j)(i))
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
updatedMatrix
}
}
And the result is:
accumulator
4
5
6
4
5
6
accumulator
1
2
3
1
2
3
m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6
total
4
5
6
4
5
6
And modify the code to this:
//var i: Int = 0
while (j < columnLength) {
var i =0
And the result is:
m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6
total
4
5
6
4
5
6
accumulator
1
2
3
1
2
3
accumulator
4
5
6
4
5
6
The final result is the same.
But I have two questions:
@Vijay Innamuri
Upvotes: 0
Reputation: 3848
localValue is supposed to be different, according to the document:
value
.
*Upvotes: 0
Reputation: 4372
addAccumulator method is called whenever there is an update to the accumulator.variable
In the above code accumulator += (x(0),0,0) invokes the addAccumulator method.
once all the tasks are completed addInPlace method is called to aggregate the accumulated values from all the tasks.
In the above code initialValue Array(1, 1, 1)Array(1, 1, 1) and task Accumulator value Array(4, 5, 6) Array(4, 5, 6) invokes the addInPlace method.
In the above code variable i in addInPlace method has to be reset whenever it enters the loop while (j < columnLength) {
Following code works like a charm.
while (j < columnLength) {
i=0
while (i < rowLength) {
println("m1(j)(i)"+ m1(j)(i))
println(" m2(j)(i))"+ m2(j)(i))
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
Upvotes: 3