pythonic
pythonic

Reputation: 21589

How can I pass Spark's accumulator to a function?

I want to do something like this.

val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....

What should be in the place of the_accumulator_ojbect in the code above? Would writing ac there be just fine?

Also, in the function

def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
    .....
}

What should be in the place of TypeOfAccumulator in the function above?

Upvotes: 4

Views: 3829

Answers (1)

Avihoo Mamka
Avihoo Mamka

Reputation: 4786

Additional info about Spark accumulators can be found here

According to the scala-docs regarding the creation of the accumulator:

/** * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display * in the Spark UI. Tasks can "add" values to the accumulator using the += method. Only the * driver can access the accumulator's value. */

The default accumulator type is int. You can set your own type though, but need to properly implement the += method to add values to your own accumulator type:

val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")

Your main code fragment will be like:

val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)

Where the someFunction method implantation will be like:

def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
    ...
    ac += 1
    ...
}

Upvotes: 5

Related Questions