Reputation: 11666
The code snippet below updates a not-thread-safe map (itemsById
is not thread safe) from a parallel stream's forEach
block.
// Update stuff in `itemsById` by iterating over all stuff in newItemsById:
newItemsById.entrySet()
.parallelStream()
.unordered()
.filter(...)
.forEach(entry -> {
itemsById.put(entry.getKey(), entry.getValue()); <-- look
});
To me, this looks like not-thread-safe, because the parallel stream will call the forEach
block in many threads at the same time, and thus call itemsById.put(..)
in many threads at the same time, and itemsById
isn't thread safe. (However, with a ConcurrentMap the code would be safe I think)
I wrote to a colleague: "Please note that the map might allocate new memory when you insert new data. That's likely not thread safe, since the collection is not thread safe. -- Whether or not writing to different keys from many threads, is thread safe, is implementation dependent, I would think. It's nothing I would choose to rely on."
He however says that the above code is thread safe. -- Is it?
((Please note: I don't think this question is too localized. Actually now with Java 8 I think fairly many people will do something like: parallelStream()...foreach(...)
and then it might be good know about thread safety issues, for many people))
Upvotes: 2
Views: 2640
Reputation: 4444
This code is not thread-safe.
Oracle docs state:
Operations like forEach and peek are designed for side effects; a lambda expression that returns void, such as one that invokes System.out.println, can do nothing but have side effects. Even so, you should use the forEach and peek operations with care; if you use one of these operations with a parallel stream, then the Java runtime may invoke the lambda expression that you specified as its parameter concurrently from multiple threads.
Upvotes: 0
Reputation: 100139
You're right: this code is not thread-safe and depending on the Map
implementation and race condition may produce any random effect: correct result, silent loss of data, some exception or endless loop. You may easily check it like this:
int equal = 0;
for(int i=0; i<100; i++) {
// create test input map like {0 -> 0, 1 -> 1, 2 -> 2, ...}
Map<Integer, Integer> input = IntStream.range(0, 200).boxed()
.collect(Collectors.toMap(x -> x, x -> x));
Map<Integer, Integer> result = new HashMap<>();
// write it into another HashMap in parallel way without key collisions
input.entrySet().parallelStream().unordered()
.forEach(entry -> result.put(entry.getKey(), entry.getValue()));
if(result.equals(input)) equal++;
}
System.out.println(equal);
On my machine this code usually prints something between 20 and 40 instead of 100. If I change HashMap
to TreeMap
, it usually fails with NullPointerException
or becomes stuck in the infinite loop inside TreeMap
implementation.
Upvotes: 3
Reputation: 88707
I'm no expert on streams but I assume there is no fancy synchronization employed here and thus I wouldn't consider adding elements to itemsById
in parallel as threadsafe.
One of the things that could happen would be an endless loop since if both elements would happen to end up in the same bucket the underlying list might be messed up and elements could refer to each other in a cycle (A.next = B, B.next = A). A ConcurrentHashMap
would prevent that by synchronizing write access on the bucket, i.e. unless the elements end up in the same bucket it would not block but if they do the add is sequential.
Upvotes: 1