Reputation: 3591
I have a problem with using accumulators in Spark. As seen on the Spark website, if you want custom accumulators you can simply extend (with an object) the AccumulatorParam
trait. The problem is that I want to, but can't, make that object generic, such as this:
object SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}
But this gives me a compile error because objects can't use generic parameters. My situation doesn't really allow me to define a SeqAccumulatorParam
for each given type since that would lead to a lot of ugly code duplication.
I have an alternative method, just placing all of the results in an RDD
and then later iterating over them with an accumulator, defined for that single type, but this would be much nicer.
My question is: is there any other way to create accumulators?
Upvotes: 2
Views: 1452
Reputation: 8851
You can simply use a class to create objects, in place of singleton objects.
class SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}
val seqAccum = sc.accumulator(Seq[Int]())(new SeqAccumulatorParam[Int]())
val lists = (1 to 5).map(x => (0 to x).toList)
sc.parallelize(lists).foreach(x => seqAccum += x)
seqAccum.value
// Seq[Int] = List(0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 0, 1, 2, 0, 1)
// result can be in different order.
// For Doubles.
val seqAccumD = sc.accumulator(Seq[Double]())(new SeqAccumulatorParam[Double]())
sc.parallelize(lists.map(x => x.map(_.toDouble))).foreach(x => seqAccumD += x)
seqAccumD.value
// Seq[Double] = List(0.0, 1.0, 0.0, 1.0, 2.0, 0.0, 1.0, 2.0, 3.0, 0.0, 1.0, 2.0, 3.0, 4.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
Upvotes: 7