Reputation: 8146
I am new to parallel stream and trying to make 1 sample program that will calculate value * 100(1 to 100) and store it in map. While executing code I am getting different count on each iteration. I may be wrong at somewhere so please guide me anyone knows the proper way to do so.
code:
import java.util.*;
import java.lang.*;
import java.io.*;
import java.util.stream.Collectors;
public class Main{
static int l = 0;
public static void main (String[] args) throws java.lang.Exception {
letsGoParallel();
}
public static int makeSomeMagic(int data) {
l++;
return data * 100;
}
public static void letsGoParallel() {
List<Integer> dataList = new ArrayList<>();
for(int i = 1; i <= 100 ; i++) {
dataList.add(i);
}
Map<Integer, Integer> resultMap = new HashMap<>();
dataList.parallelStream().map(f -> {
Integer xx = 0;
{
xx = makeSomeMagic(f);
}
resultMap.put(f, xx);
return 0;
}).collect(Collectors.toList());
System.out.println("Input Size: " + dataList.size());
System.out.println("Size: " + resultMap.size());
System.out.println("Function Called: " + l);
}
}
Last Output
Input Size: 100
Size: 100
Function Called: 98
On each time run output differs. I want to use parallel stream in my own application but due to this confusion/issue I can't. In my application I have 100-200 unique numbers on which some same operation needs to be performed. In short there's function which process something.
Upvotes: 2
Views: 1333
Reputation: 1076
Stream
will help you do loop in byte code. Stream
, do not use no thread-safe variable in multi-thread(include parallelStream
)like this.
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ParallelStreamClient {
// static int l = 0;---> no need to count times.
public static void main(String[] args) throws java.lang.Exception {
letsGoParallel();
}
public static int makeSomeMagic(int data) {
// l++;-----> this is no thread-safe way
return data * 100;
}
public static void letsGoParallel() {
List<Integer> dataList = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
dataList.add(i);
}
Map<Integer, Integer> resultMap =
dataList.parallelStream().collect(Collectors.toMap(i -> i,ParallelStreamClient::makeSomeMagic));
System.out.println("Input Size: " + dataList.size());
System.out.println("Size: " + resultMap.size());
//System.out.println("Function Called: " + l);
}
Upvotes: 0
Reputation: 5173
By putting some values in resultMap
you're using a side-effect:
dataList.parallelStream().map(f -> {
Integer xx = 0;
{
xx = makeSomeMagic(f);
}
resultMap.put(f, xx);
return 0;
})
The API states:
Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements.
Going on with:
Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline.
It follows an example similar to yours showing:
... if the mapping operation is performed in parallel, the results for the same input could vary from run to run, due to thread scheduling differences, whereas, with a stateless lambda expression the results would always be the same.
That explains your observation: On each time run output differs.
The right approach is shown by @Eran
Upvotes: 2
Reputation: 3232
Hopefully it works fine. by making Synchronied
function makeSomeMagic
and using Threadsafe data structure ConcurrentHashMap
and write simple statement
dataList.parallelStream().forEach(f -> resultMap.put(f, makeSomeMagic(f)));
Whole code is here :
import java.util.*;
import java.lang.*;
import java.io.*;
import java.util.stream.Collectors;
public class Main{
static int l = 0;
public static void main (String[] args) throws java.lang.Exception {
letsGoParallel();
}
public synchronized static int makeSomeMagic( int data) { // make it synchonized
l++;
return data * 100;
}
public static void letsGoParallel() {
List<Integer> dataList = new ArrayList<>();
for(int i = 1; i <= 100 ; i++) {
dataList.add(i);
}
Map<Integer, Integer> resultMap = new ConcurrentHashMap<>();// use ConcurrentHashMap
dataList.parallelStream().forEach(f -> resultMap.put(f, makeSomeMagic(f)));
System.out.println("Input Size: " + dataList.size());
System.out.println("Size: " + resultMap.size());
System.out.println("Function Called: " + l);
}
}
Upvotes: 0
Reputation: 393831
Your access to both the HashMap
and to the l
variable are both not thread safe, which is why the output is different in each run.
The correct way to do what you are trying to do is collecting the Stream
elements into a Map
:
Map<Integer, Integer> resultMap =
dataList.parallelStream()
.collect(Collectors.toMap (Function.identity (), Main::makeSomeMagic));
EDIT: The l
variable is still updated in a not thread safe way with this code, so you'll have to add your own thread safety if the final value of the variable is important to you.
Upvotes: 5