Erel
Erel

Reputation: 31

Is immutability a "must" or "should" for custom accumulators?

I would like to create custom accumulators and I can't feel safe when using them since I can only test them locally for now.

My question is:

Is immutability a "must" or "should" when creating accumulators?

Although I can't find the link/reference now, I have read that only immutable objects are allowed for accumulators. However, in spark's api(1.6), addInPlace method of AccumulableParam and AccumulatorParam have the description as follows: "Merge two accumulated values together. Is allowed to modify and return the first value for efficiency (to avoid allocating objects)."

Which one is correct? And if mutable objects are allowed how to use them to safely create accumulators?

Let's say, I have a mutable class with with one field, and let that field to be an array of integers. How to override addInPlace method when we have a mutable class?

Should I write(Option1):

public MyClass addInPlace(MyClass c1, MyClass c2){
c1.update(c2); //Where int array of c1 is updated(let's say we add two arrays) and c1 itself is returned.
return c1;
}

Or should I write(Option2):

public MyClass addInPlace(MyClass c1, MyClass c2){
return update2(c1,c2); //Where a new MyClass object is returned with an array(created by adding arrays of c1 and c2)
}

Option2 seems safer but requires additional allocation. However, above quote from API says that modification is allowed to avoid allocation.

In addition, if I have an array of objects(let's say MyClass2) rather than array of integers, should I clone the objects, or use the objects themselves. Let's say I want to create an accumulator for a PriorityQueue of MyClass2 (Maybe I should enter another entry for this question?).

I will appreciate any answer and advanced reference/documents on accumulators/Spark, especially in java.


Edit:

I thank zero323 for the answer.

I wish I could find the link that confused me, but things are clearer now. However, I have 2 additional questions.

1) I encountered the following accumulator implementation to keep track of the number of times a browser type seen in log files. You may see the details from (https://brosinski.com/post/extending-spark-accumulators/).

Here is the implementation:

public class MapAccumulator implements AccumulatorParam<Map<String, Long>>, Serializable {

@Override
public Map<String, Long> addAccumulator(Map<String, Long> t1, Map<String, Long> t2) {
    return mergeMap(t1, t2);
}

@Override
public Map<String, Long> addInPlace(Map<String, Long> r1, Map<String, Long> r2) {
    return mergeMap(r1, r2);

}

@Override
public Map<String, Long> zero(final Map<String, Long> initialValue) {
    return new HashMap<>();
}

private Map<String, Long> mergeMap( Map<String, Long> map1, Map<String, Long> map2) {
    Map<String, Long> result = new HashMap<>(map1);
    map2.forEach((k, v) -> result.merge(k, v, (a, b) -> a + b));
    return result;
}

}

My question is:

Why not we have

map2.forEach((k, v) -> map1.merge(k, v, (a, b) -> a + b));

Also, let's say I would like to have a

Map<Integer, ArrayList<MyClass>> or ArrayList<ArrayList<MyClass>>

Can I have something like (Option1):

public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
for(int i=0;i<a2.size();i++){
    a1.get(i).addAll(a2.get(i))
}
return a1;
}

Or should I write (Option2):

public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
ArrayList<ArrayList<MyClass>> result= new ArrayList<ArrayList<MyClass>>();
for(int i=0;i<a1.size();i++){
    result.add(new ArrayList<MyClass>());
    result.get(i).addAll(a1.get(i));
    result.get(i).addAll(a2.get(i));
}
return result;
}

So is there a difference between 2 options in terms of accumulator safety?

2) By saying accumulators are not thread-safe, do you mean that an rdd element can update the accumulator multiple times? Or do you mean that objects used during the process can be changed from somewhere else in the code by another thread?

Or is it a problem only when shipping accumulators to driver, as written in the link zero323 shared (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulable.scala#L43):

"If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver via heartbeats. For internal [[Accumulable]]s, R must be thread safe so that they can be reported correctly."

I apologize for the long entry, but I hope it will be helpful for the community as well.

Upvotes: 3

Views: 400

Answers (1)

zero323
zero323

Reputation: 330203

Is immutability required for when creating custom accumulators? No it is not. You've already discovered that both AccumulableParam.addAccumulator and AccumulableParam.addInPlace explicitly allow modification of the first argument. If you dive deeper you'll see that this scenario is actually tested in the AccumulatorSuite where following param is used:

new AccumulableParam[mutable.Set[A], A] {
  def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
    t1 ++= t2
    t1
  }
  def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
    t1 += t2
    t1
  }
  def zero(t: mutable.Set[A]) : mutable.Set[A] = {
    new mutable.HashSet[A]()
  }
}

Intuitively, since each task has its own accumulator and operates on a partition in a sequential manner so there should be no case when mutability becomes an issue.

Nevertheless, as stated somewhere else accumulables are not thread safe. So you should probably forget about combining accumulators with parallel processing on the partition level.

Upvotes: 1

Related Questions