Reputation: 1159
I am trying to create some simple custom aggregate operators in Spark using Scala.
I have created a simple hierarchy of operators, with the following super-class:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
I also have a companion object, which constructs the appropriate aggregator each time. Observe that each operator is allowed to specify the Value type it wants.
Now the problem is when I try to call combineByKey
:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
The error is:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
For my needs, Value
can either be a numeric type or a tuple, hence its no bounds definition. If I replace the Value
type declaration with:
type Value = Double
in Aggregator
class, then everything works fine. Therefore, I suppose that the error is relevant to reduceByKey
not knowing the exact Value
type in compile time.
Any ideas on how to get around this?
Upvotes: 1
Views: 324
Reputation: 44918
Your RDD
cannot be implicitly converted into PairRDDFunctions
, because all the implicit ClassTag
s for keys and values are missing.
You might want to include the class tags as implicit parameters in your Aggregator
:
sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
implicit val keyClassTag: ClassTag[K] = implicitly
implicit val valueClassTag: ClassTag[V] = implicitly
}
or maybe:
sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
implicit val keyClassTag: ClassTag[K] = kt
implicit val valueClassTag: ClassTag[V] = vt
}
or maybe even:
sealed abstract class Aggregator(name: String) {
type K
type V
implicit def keyClassTag: ClassTag[K]
implicit def valueClassTag: ClassTag[V]
}
The last variant would shift the responsibility for providing the ClassTag
s to the implementor of the abstract class.
Now, when using an aggregator a
of type Aggregator[K, V]
in a reduceByKey
, you would have to make sure that those implicitly provided class tags are in the current implicit scope:
val agg = Aggregator("SUM")
import agg._ // now the implicits should be visible
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
Upvotes: 2