Elad Benda2
Elad Benda2

Reputation: 15472

Why does Java stream map reduce count my result twice?

I have this code a:

        ComparisonResults comparisonResults = requestsList
                .stream()
                .map(item -> getResponse(item))
                .map(item -> compareToBl(item))
                .reduce(new ComparisonResults(), (result1, result2) ->
                {
                     result1.addSingleResult(result2);
                 //   return result1;
                    return new ComparisonResults(result1);
                });

and this code b:

        ComparisonResults comparisonResults = requestsList
                .parallelStream()
                .map(item -> getResponse(item))
                .map(item -> compareToBl(item))
                .reduce(new ComparisonResults(), (result1, result2) ->
                {
                     result1.addSingleResult(result2);
                 //   return result1;
                    return new ComparisonResults(result1);
                });

All I do is to create response objects, then transform them to comaprisonResult objects and to reduce them to one comaprisonResult.

code a shows an int class member comparisonResults.num_of_sub_responses==5 which is correct

code b shows an int class member comparisonResults.num_of_sub_responses==10 which is double the correct result.

java 8 reduce should be thread safe, right?

am i missing anything?

getResponse and compareToBl are thread safe

Upvotes: 3

Views: 362

Answers (2)

RealSkeptic
RealSkeptic

Reputation: 34628

The reduce operation reduce(identity, operator) relies on two important assumptions about the arguments you pass it.

  1. The first parameter is an identity. That is, when you use the reducing operator between any item and the given identity, you get the original item.
  2. The operator is associative.

These two assumptions are very important when working with a parallel stream, because in general, what it does is:

  • Give each thread a slice of the stream
  • Each thread starts with the identity, and accumulates a result using the elements in the stream:

    result = identity op element1 op element2 op element3 ...
    
  • Then the results from different threads are combined together using the operator:

    grand result = result1 op result2 op result3
    

So, suppose you were summing numbers, it breaks an operation like 5 + 4 + 3 + 20 into ( 0 + 5 + 4 ) + ( 0 + 3 + 20 ). Which works if, and only if, the above assumptions are true. (0 is the identity for addition, addition is associative).

But by mutating the first operand inside your operator, it means that you are actually mutating the identity object. So it cannot be considered an identity anymore. That is op(identity,result) does not give you the same value as result itself.

If the operator is not associative, problems will turn up in the "grand result" stage.

Upvotes: 2

Holger
Holger

Reputation: 298233

Your are mutating an incoming object in reduce. This is wrong. It doesn’t help that you are creating a new object after modifying the incoming object.

What you want to do, is

.collect(ComparisonResults::new, ComparisonResults::addSingleResult,
         (a,b)->/* code to merge two ComparisonResults instances*/);

If the result of .map(item -> compareToBl(item)) is ComparisonResults or, in other words, addSingleResult merges two ComparisonResults instances, you can use ComparisonResults::addSingleResult as merge function, though it’s name is a bit misleading then.

You should carefully read the “Reduction” chapter of the documentation and its follow-up, “Mutable reduction”.

Upvotes: 7

Related Questions