Reputation: 2467
I have this class (in Java), which I want to use in Spark (1.6):
public class Aggregation {
private Map<String, Integer> counts;
public Aggregation() {
counts = new HashMap<String, Integer>();
}
public Aggregation add(Aggregation ia) {
String key = buildCountString(ia);
addKey(key);
return this;
}
private void addKey(String key, int cnt) {
if(counts.containsKey(key)) {
counts.put(key, counts.get(key) + cnt);
}
else {
counts.put(key, cnt);
}
}
private void addKey(String key) {
addKey(key, 1);
}
public Aggregation merge(Aggregation agg) {
for(Entry<String, Integer> e: agg.counts.entrySet()) {
this.addKey(e.getKey(), e.getValue());
}
return this;
}
private String buildCountString(Aggregation rec) {
...
}
}
When starting Spark I enabled Kyro and added this class (in Scala):
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Aggregation]))
And I want to use it with Spark aggregate like this (Scala):
rdd.aggregate(new InteractionAggregation)((agg, rec) => agg.add(rec), (a, b) => a.merge(b) )
Somehow this raises a "Task not serializable" exception.
But when I use the class with map and reduce, everything works fine:
val rdd2= interactionObjects.map( _ => new InteractionAggregation())
rdd2.reduce((a,b) => a.merge(b))
println(rdd2.count())
Do you have an idea why the error occurs with aggregate but not with map/reduce?
Thanks and regards!
Upvotes: 1
Views: 579
Reputation: 681
Your Aggregation class should implement Serializable. When you call aggregate, the driver sends your (new Aggregation()) object to all workers, which results in a serialization error.
Upvotes: 1