apolak
apolak

Reputation: 141

Extending Sparks Accumulator

I'm trying to extend accumulator class in scala, but it fails on constructor

here is the error from IntelliJ:

Error:(44, 24) overloaded method constructor Accumulator with alternatives: (initialValue: org.apache.spark.AccumulatorParam[T],param: org.apache.spark.AccumulatorParam[org.apache.spark.AccumulatorParam[T]])org.apache.spark.Accumulator[org.apache.spark.AccumulatorParam[T]]` (initialValue: org.apache.spark.AccumulatorParam[T],param: org.apache.spark.AccumulatorParam[org.apache.spark.AccumulatorParam[T]],name: Option[String])org.apache.spark.Accumulator[org.apache.spark.AccumulatorParam[T]] cannot be applied to () class MyAccumulator[T] (initialValue: org.apache.spark.AccumulatorParam[T],param: org.apache.spark.AccumulatorParam[org.apache.spark.AccumulatorParam[T]])

Here is my code:

class MyAccumulator[T] (initialValue: AccumulatorParam[T],
                        param: AccumulatorParam[AccumulatorParam[T]])
  extends Accumulator[AccumulatorParam[T]] with Serializable {

  def this(initialValue: AccumulatorParam[T],
           param: AccumulatorParam[org.apache.spark.AccumulatorParam[T]], 
           name: Option[String]) = {
    this(initialValue,param,name)
  }


  override def setValue(newValue: AccumulatorParam[T]): Unit = super.setValue(newValue)

  override val id: Long = ???
  override val zero: AccumulatorParam[T] = ???

  override def +=(term: AccumulatorParam[T]): Unit = super.+=(term)

  override def add(term: AccumulatorParam[T]): Unit = super.add(term)

  override def ++=(term: AccumulatorParam[T]): Unit = super.++=(term)

  override def merge(term: AccumulatorParam[T]): Unit = super.merge(term)

  override def localValue: AccumulatorParam[T] = super.localValue

  override def value: AccumulatorParam[T] = super.value

  override def value_=(newValue: AccumulatorParam[T]): Unit = super.value_=(newValue)

  override def toString(): String = super.toString()
}

Since scala creates it's own constructor when declaring the class properties, I don't understand why it thinks that it has an empty constructor, so I've tried to declare a constructor and it failed on the same thing with extra duplicated constructor.

I'm new to scala and spark, Please assist !

Upvotes: 2

Views: 513

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149518

You need to pass the parameters via the primary constructor to the Accumulator class:

class MyAccumulator[T] (initialValue: AccumulatorParam[T],
                        param: org.apache.spark.AccumulatorParam[AccumulatorParam[T]])
extends Accumulator[AccumulatorParam[T]](initialValue, param) with Serializable

The error message:

org.apache.spark.Accumulator[org.apache.spark.AccumulatorParam[T]] cannot be applied to ()

Is because the Scala compiler will try to find a suitable constructor which takes no arguments (arity-0), but doesn't, hence it can't apply it to ().

Upvotes: 2

Related Questions