Reputation: 23800
I have the following code, it is a toy code but makes possible to reproduce the problem:
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toList;
public class TestClass3 {
public static void main(String[] args) throws InterruptedException {
// Setup data that we will be playing with concurrently
List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
HashMap<String, List<Integer>> keyValueMap = new HashMap<>();
for (String key : keys) {
int[] randomInts = new Random().ints(10000, 0, 10000).toArray();
keyValueMap.put(key, stream(randomInts).boxed().collect(toList()));
}
// Entering danger zone, concurrently transforming our data to another shape
ExecutorService es = Executors.newFixedThreadPool(10);
Map<Integer, Set<String>> valueKeyMap = new ConcurrentHashMap<>();
for (String key : keys) {
es.submit(() -> {
for (Integer value : keyValueMap.get(key)) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
});
}
// Wait for all tasks in executorservice to finish
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
// Danger zone ends..
// We should be in a single-thread environment now and safe
StringBuilder stringBuilder = new StringBuilder();
for (Integer integer : valueKeyMap.keySet()) {
String collect = valueKeyMap
.get(integer)
.stream()
.sorted() // This will blow randomly
.collect(Collectors.joining());
stringBuilder.append(collect); // just to print something..
}
System.out.println(stringBuilder.length());
}
}
When I run this code over and over again, it will usually run without any exceptions and will print some number.. However from time time (1 out of 10 tries approximately) I will get an exception akin to:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:369)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at biz.tugay.TestClass3.main(TestClass3.java:40)
I am pretty certain it has something to do with
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
If I change this part as follows, I never get an exception:
synchronized (valueKeyMap) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
I am thinking computeIfAbsent
is still modifying the valueKeyMap
even after all threads are finished.
Could someone explain how come this code is failing randomly, what the reason is? Or is there a totally different reason I am unable to see perhaps and I am wrong in my assumption that computeIfAbsent
is to blame?
Upvotes: 8
Views: 923
Reputation: 183456
The problem isn't in the computeIfAbsent
call, but rather in the .add(key)
at the end: you can have multiple threads trying to add elements to the same HashSet, with nothing to ensure safe concurrent access. Since HashSet isn't threadsafe, this doesn't work properly, and the HashSet sometimes ends up in a corrupt state. Later, when you try to iterate over the HashSet to get a string, it blows up due to this corrupt state. (Judging from your exception, the HashSet thinks its backing array is longer than it actually is, so it's trying to access out-of-bounds array elements.)
Even in the runs where you don't get an exception, you probably sometimes end up "dropping" elements that should have gotten added, but where concurrent updates mean that some updates were lost.
Upvotes: 8
Reputation: 140484
ConcurrentHashMap.computeIfAbsent
executes atomically, that is, only one thread can access the value associated with a given key at a time.
However, there is no such guarantee once the value is returned. The HashSet
can be accessed by multiple writing threads, and as such is not being accessed thread-safely.
Instead, you can do something like this:
valueKeyMap.compute(value, (k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(key);
return v;
});
which works because compute
is atomic too.
Upvotes: 4
Reputation: 468
The fact that when using synchronized
you do not get an exception should already shed some light as to where the problem is. As already stated the problem is indeed the HashSet
as it is not thread safe. This is also stated in the documentation of the collection.
Note that this implementation is not synchronized. If multiple threads access a hash set concurrently, and at least one of the threads modifies the set, it must be synchronized externally. This is typically accomplished by synchronizing on some object that naturally encapsulates the set.
The solution is to either use the synchronized
block or make use of a thread safe CollectionView
such as KeySetView
which you can get using ConcurrentHashMap.newKeySet()
.
Upvotes: 1