Reputation: 21589
I want to do something like this.
val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....
What should be in the place of the_accumulator_ojbect
in the code above? Would writing ac
there be just fine?
Also, in the function
def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
.....
}
What should be in the place of TypeOfAccumulator
in the function above?
Upvotes: 4
Views: 3829
Reputation: 4786
Additional info about Spark accumulators can be found here
According to the scala-docs regarding the creation of the accumulator:
/** * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display * in the Spark UI. Tasks can "add" values to the accumulator using the
+=
method. Only the * driver can access the accumulator'svalue
. */
The default accumulator type is int
. You can set your own type though, but need to properly implement the +=
method to add values to your own accumulator type:
val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")
Your main code fragment will be like:
val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)
Where the someFunction
method implantation will be like:
def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
...
ac += 1
...
}
Upvotes: 5