Daniel
Daniel

Reputation: 2467

Spark: Serialization not working with Aggregate

I have this class (in Java), which I want to use in Spark (1.6):

public class Aggregation {
  private Map<String, Integer> counts;

  public Aggregation() {
    counts = new HashMap<String, Integer>();
  }

  public Aggregation add(Aggregation ia) {
    String key = buildCountString(ia);
    addKey(key);
    return this;
  }

  private void addKey(String key, int cnt) {
    if(counts.containsKey(key)) {
        counts.put(key, counts.get(key) + cnt);
    }
    else {
        counts.put(key, cnt);
    }
  }

  private void addKey(String key) {
    addKey(key, 1);
  }

  public Aggregation merge(Aggregation agg) {
    for(Entry<String, Integer> e: agg.counts.entrySet()) {
        this.addKey(e.getKey(), e.getValue());
    }
    return this;
  }

  private String buildCountString(Aggregation rec) {
    ...
  }
}

When starting Spark I enabled Kyro and added this class (in Scala):

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Aggregation]))

And I want to use it with Spark aggregate like this (Scala):

rdd.aggregate(new InteractionAggregation)((agg, rec) => agg.add(rec), (a, b) => a.merge(b) )

Somehow this raises a "Task not serializable" exception.

But when I use the class with map and reduce, everything works fine:

val rdd2= interactionObjects.map( _ => new InteractionAggregation())
rdd2.reduce((a,b) => a.merge(b))
println(rdd2.count())

Do you have an idea why the error occurs with aggregate but not with map/reduce?

Thanks and regards!

Upvotes: 1

Views: 579

Answers (1)

Ulysse Mizrahi
Ulysse Mizrahi

Reputation: 681

Your Aggregation class should implement Serializable. When you call aggregate, the driver sends your (new Aggregation()) object to all workers, which results in a serialization error.

Upvotes: 1

Related Questions