Reputation: 1409
I want to use accumulators to count combinations of few parameters of objects in my RDD.
E.g I have RDD of Obj
with fields a
and b
. Both fields are enum that may have one of few values.
To achieve it I should create accumulators on driver and use it on workers:
val acc1 = sc.longAccumulator("a1-b1")
val acc2 = sc.longAccumulator("a2-b1")
val acc3 = sc.longAccumulator("a1-b2")
...
I don't want to declare a lot of counters for all combinations of values in all spark jobs where I will have the same logic. Is there any mechanism which allows to create accumulator dynamically on executor or solve this problem another way?
I search something like that:
rdd.foreach{ getAccumulator("${obj.a} - ${obj.b}").add(1) }
Upvotes: 1
Views: 737
Reputation: 1409
Link from Egordoe's answer is correct for old spark versions. My implementation for modern spark (kotlin):
class MapAccumulator(val counters: MutableMap<String, Long>) : AccumulatorV2<String, Map<String, Long>>() {
constructor() : this(HashMap<String, Long>())
override fun isZero() = counters.isEmpty()
override fun copy(): AccumulatorV2<String, Map<String, Long>> {
val copy = HashMap<String, Long>()
copy.putAll(counters)
return MapAccumulator(copy)
}
override fun reset() {
counters.clear()
}
override fun add(v: String) {
counters.merge(v, 1, Long::plus)
}
override fun merge(other: AccumulatorV2<String, Map<String, Long>>) {
other.value().forEach { (k, v) -> counters.merge(k, v, Long::plus) }
}
override fun value(): Map<String, Long> = counters
}
Upvotes: 0
Reputation: 958
Answering your question literally, you can't dynamically register new accumulators on an executor. Accumulators must be planned on the driver (sparkContext.accumulator()
) before the job actually started. This is how accumulators are designed in Spark.
But thinking of what you actually want to implement, you may conclude that the same functionality can be implemented by having only one "static" accumulator, an accumulator that gathers Map<String, Long>
entries rather than Long
.
This blog post may give a more practical sense of what I mean here.
Upvotes: 2