ThatDataGuy
ThatDataGuy

Reputation: 2109

Will java streams sum values of a ConcurrentHashMap in an consistent manner?

I have a concurrentHashMap instance that some threads add entries to. The values are integers.

Simultaneously, other threads wish to retrieve the sum of all the values in the map. I wish that these threads see a consistent value. However, it doesn't need to be such that they always see the latest value.

Is the following code thread safe?

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MyClass {
    private Map<Integer, Integer> values = new ConcurrentHashMap<>();

    public void addValue(Integer key, int value){
        values.put(key, value);
    }

    public long sumOfValues(){
        return values
                .values()
                .stream()
                .mapToInt(Integer::intValue)
                .sum();
    }
}

Will the sum operation be calculated on a consistent set of values?

When the sum operation is happening, will calls to put() be blocked?

Of course I could synchronize the access myself, and even split the read and write locks to allow for concurrent read access and synchronized write access, but I am curious if its necessary when using concurrentHashMap as the collection implementation.

Upvotes: 3

Views: 1440

Answers (3)

kaya3
kaya3

Reputation: 51093

If you need to query the sum concurrently, one solution is to write a wrapper class which maintains both the map's state and the sum, using a LongAdder to atomically maintain the sum.

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;

public class MapSum {
    private final ConcurrentMap<Integer, Integer> map = new ConcurrentHashMap<>();
    private final LongAdder sum = new LongAdder();

    public Integer get(Integer k) {
        return map.get(k);
    }

    public Integer put(Integer k, Integer v) {
        Integer[] out = new Integer[1];
        map.compute(k, (_k, old) -> {
            out[0] = old;
            // cast to long to avoid overflow
            sum.add((long) v - (old != null ? old : 0));
            return v;
        });
        return out[0];
    }

    public Integer remove(Integer k) {
        Integer[] out = new Integer[1];
        map.compute(k, (_k, old) -> {
            out[0] = old;
            // cast to long to avoid overflow; -Integer.MIN_VALUE == Integer.MIN_VALUE
            if(old != null) { sum.add(- (long) old); }
            return null;
        });
        return out[0];
    }

    public long sum() {
        return sum.sum();
    }
}

This has the added benefit of querying the sum in O(1) instead of O(n) time. You can add more Map methods if you like, and even implement Map<Integer, Integer> - just be careful to maintain the sum when you change the map's contents in any way.

Upvotes: 0

Curiosa Globunznik
Curiosa Globunznik

Reputation: 3205

The documentation says about ConcurrentHashMap's keySet() and entrySet(): The view's iterators and spliterators are weakly consistent.

Weakly consistent characterized as

  • they may proceed concurrently with other operations
  • they will never throw ConcurrentModificationException
  • they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

So...

Is the following code thread safe?

Yes, in the narrow sense of absent ConcurrentModificationException or internal inconsistencies of the HashMap.

Will the sum operation be calculated on a consistent set of values?

on a weakly consistent set

When the sum operation is happening, will calls to put() be blocked?

No

Upvotes: 6

Tom Hawtin - tackline
Tom Hawtin - tackline

Reputation: 147164

The point of ConcurrentHashMap is that the entries are as independent from one another as possible. There isn't a consistent view of the whole map. Indeed, even size doesn't return a very useful value.

Upvotes: 1

Related Questions