Johan S
Johan S

Reputation: 3591

Spark AccumulatorParam Generic Parameters

I have a problem with using accumulators in Spark. As seen on the Spark website, if you want custom accumulators you can simply extend (with an object) the AccumulatorParamtrait. The problem is that I want to, but can't, make that object generic, such as this:

object SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {

    override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()

    override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2

}

But this gives me a compile error because objects can't use generic parameters. My situation doesn't really allow me to define a SeqAccumulatorParam for each given type since that would lead to a lot of ugly code duplication.

I have an alternative method, just placing all of the results in an RDD and then later iterating over them with an accumulator, defined for that single type, but this would be much nicer.

My question is: is there any other way to create accumulators?

Upvotes: 2

Views: 1452

Answers (1)

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

You can simply use a class to create objects, in place of singleton objects.

class SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
    override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
    override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}

val seqAccum = sc.accumulator(Seq[Int]())(new SeqAccumulatorParam[Int]())  

val lists = (1 to 5).map(x => (0 to x).toList)
sc.parallelize(lists).foreach(x => seqAccum += x)

seqAccum.value
// Seq[Int] = List(0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 0, 1, 2, 0, 1)
// result can be in different order.

// For Doubles.
val seqAccumD = sc.accumulator(Seq[Double]())(new SeqAccumulatorParam[Double]())
sc.parallelize(lists.map(x => x.map(_.toDouble))).foreach(x => seqAccumD += x)

seqAccumD.value
// Seq[Double] = List(0.0, 1.0, 0.0, 1.0, 2.0, 0.0, 1.0, 2.0, 3.0, 0.0, 1.0, 2.0, 3.0, 4.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0)

Upvotes: 7

Related Questions