Hugh
Hugh

Reputation: 115

java 8 parallelStream().forEach Result data loss

There are two test cases which use parallelStream():

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
  src.add(i);
}
List<String> strings = new ArrayList<>();
       
src.parallelStream().filter(integer -> (integer % 2) == 0).forEach(integer -> strings.add(integer + ""));
    
System.out.println("=size=>" + strings.size());
=size=>9332
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 20000; i++) {
  src.add(i);
}
List<String> strings = new ArrayList<>();

src.parallelStream().forEach(integer -> strings.add(integer + ""));

System.out.println("=size=>" + strings.size());
=size=>17908

Why do I always lose data when using parallelStream? What did i do wrong?

Upvotes: 11

Views: 8543

Answers (4)

NoisyBoy
NoisyBoy

Reputation: 364

ParallelStream with forEach is a deadly combo if not used carefully. Please take a look at below points to avoid any bugs:

  1. If you have a preexisting list object in which you want to add more objects from a parallelStream loop, Use Collections.synchronizedList & pass that pre-existing list object to it before looping through the parallelstream.

  2. If you have to create a new list, then you can use Vector to initialize the list outside the loop. or

  3. If you have to create a new list, then simply use parallelStream and collect the output at the end.

Upvotes: 1

M. Deinum
M. Deinum

Reputation: 124898

ArrayList isn't thread-safe. While 1 thread sees a list with 30 elements another might still see 29 and override the 30th position (loosing 1 element).

Another issue might arise when the array backing the list needs to be resized. A new array (with double the size) is created and elements from the original array are copied into it. While other threads might have added stuff the thread doing the resizing might not have seen this or multiple threads are resizing and eventually only 1 will win.

When using multiple threads you need to either do some syncronized when accessing the list OR use a multi-thread safe list (by either wrapping it in a SynchronizedList or by using a CopyOnWriteArrayList to mention 2 possible solutions). Even better would be to use the collect method on the stream to put everything into a list.

Upvotes: 1

markusk
markusk

Reputation: 6677

ArrayList isn't thread safe. You need to do

List<String> strings = Collections.synchronizedList(new ArrayList<>());

or

List<String> strings = new Vector<>();

to ensure all updates are synchronized, or switch to

List<String> strings = src.parallelStream()
    .filter(integer -> (integer % 2) == 0)
    .map(integer -> integer + "")
    .collect(Collectors.toList());

and leave the list building to the Streams framework. Note that it's undefined whether the list returned by collect is modifiable, so if that is a requirement, you may need to modify your approach.

In terms of performance, Stream.collect is likely to be much faster than using Stream.forEach to add to a synchronized collection, since the Streams framework can handle collection of values in each thread separately without synchronization and combine the results at the end in a thread safe fashion.

Upvotes: 8

asdofindia
asdofindia

Reputation: 149

You lose the benefits of using stream (and parallel stream) when you try to do mutation. As a general rule, avoid mutation when using streams. Venkat Subramaniam explains why. Instead, use collectors. Also try to get a lot accomplished within the stream chain. For example:

System.out.println(
                IntStream.range(0, 200000)
                        .filter(i -> i % 2 == 0)
                        .mapToObj(String::valueOf)
                        .collect(Collectors.toList()).size()
        );

You can run that in parallelStream by adding .parallel()

Upvotes: 0

Related Questions