Reputation: 15472
I have this code a:
ComparisonResults comparisonResults = requestsList
.stream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
and this code b:
ComparisonResults comparisonResults = requestsList
.parallelStream()
.map(item -> getResponse(item))
.map(item -> compareToBl(item))
.reduce(new ComparisonResults(), (result1, result2) ->
{
result1.addSingleResult(result2);
// return result1;
return new ComparisonResults(result1);
});
All I do is to create response objects, then transform them to comaprisonResult objects and to reduce them to one comaprisonResult.
code a
shows an int class member comparisonResults.num_of_sub_responses==5
which is correct
code b
shows an int class member comparisonResults.num_of_sub_responses==10
which is double the correct result.
java 8 reduce should be thread safe, right?
am i missing anything?
getResponse
and compareToBl
are thread safe
Upvotes: 3
Views: 362
Reputation: 34628
The reduce operation reduce(identity, operator)
relies on two important assumptions about the arguments you pass it.
identity
, you get the original item.These two assumptions are very important when working with a parallel stream, because in general, what it does is:
Each thread starts with the identity, and accumulates a result using the elements in the stream:
result = identity op element1 op element2 op element3 ...
Then the results from different threads are combined together using the operator:
grand result = result1 op result2 op result3
So, suppose you were summing numbers, it breaks an operation like 5 + 4 + 3 + 20
into ( 0 + 5 + 4 ) + ( 0 + 3 + 20 )
. Which works if, and only if, the above assumptions are true. (0 is the identity for addition, addition is associative).
But by mutating the first operand inside your operator, it means that you are actually mutating the identity object. So it cannot be considered an identity anymore. That is op(identity,result)
does not give you the same value as result
itself.
If the operator is not associative, problems will turn up in the "grand result" stage.
Upvotes: 2
Reputation: 298233
Your are mutating an incoming object in reduce
. This is wrong. It doesn’t help that you are creating a new object after modifying the incoming object.
What you want to do, is
.collect(ComparisonResults::new, ComparisonResults::addSingleResult,
(a,b)->/* code to merge two ComparisonResults instances*/);
If the result of .map(item -> compareToBl(item))
is ComparisonResults
or, in other words, addSingleResult
merges two ComparisonResults
instances, you can use ComparisonResults::addSingleResult
as merge function, though it’s name is a bit misleading then.
You should carefully read the “Reduction” chapter of the documentation and its follow-up, “Mutable reduction”.
Upvotes: 7