Egor
Egor

Reputation: 1409

Create accumulator on executor dynamically

I want to use accumulators to count combinations of few parameters of objects in my RDD. E.g I have RDD of Obj with fields a and b. Both fields are enum that may have one of few values. To achieve it I should create accumulators on driver and use it on workers:

val acc1 = sc.longAccumulator("a1-b1")
val acc2 = sc.longAccumulator("a2-b1")
val acc3 = sc.longAccumulator("a1-b2")
...

I don't want to declare a lot of counters for all combinations of values in all spark jobs where I will have the same logic. Is there any mechanism which allows to create accumulator dynamically on executor or solve this problem another way?

I search something like that:

rdd.foreach{ getAccumulator("${obj.a} - ${obj.b}").add(1) }

Upvotes: 1

Views: 737

Answers (2)

Egor
Egor

Reputation: 1409

Link from Egordoe's answer is correct for old spark versions. My implementation for modern spark (kotlin):

class MapAccumulator(val counters: MutableMap<String, Long>) : AccumulatorV2<String, Map<String, Long>>() {

  constructor() : this(HashMap<String, Long>())

  override fun isZero() = counters.isEmpty()

  override fun copy(): AccumulatorV2<String, Map<String, Long>> {
      val copy = HashMap<String, Long>()
      copy.putAll(counters)
      return MapAccumulator(copy)
  }

  override fun reset() {
      counters.clear()
  }

  override fun add(v: String) {
      counters.merge(v, 1, Long::plus)
  }

  override fun merge(other: AccumulatorV2<String, Map<String, Long>>) {
      other.value().forEach { (k, v) -> counters.merge(k, v, Long::plus) }
  }

  override fun value(): Map<String, Long> = counters
}

Upvotes: 0

egordoe
egordoe

Reputation: 958

Answering your question literally, you can't dynamically register new accumulators on an executor. Accumulators must be planned on the driver (sparkContext.accumulator()) before the job actually started. This is how accumulators are designed in Spark.

But thinking of what you actually want to implement, you may conclude that the same functionality can be implemented by having only one "static" accumulator, an accumulator that gathers Map<String, Long> entries rather than Long.

This blog post may give a more practical sense of what I mean here.

Upvotes: 2

Related Questions