Robin Sun
Robin Sun

Reputation: 39

Stream API reduce used on ArrayList not synchronized

I am using stream API reduce to test a array list of String.

for (int i = 0; i < 100; i++)
    {
        Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
        Predicate<String> predicate = t -> t.contains("a");

        List<String> strings2 = new ArrayList<>();
        s1.parallel().reduce(new ArrayList<String>(),
                new BiFunction<ArrayList<String>, String, ArrayList<String>>()
                {
                    @Override
                    public ArrayList<String> apply(ArrayList<String> strings, String s)
                    {
                        if (predicate.test(s))
                        {
                            strings.add(s);
                        }

                        return strings;
                    }
                }, new BinaryOperator<ArrayList<String>>()
                {
                    @Override
                    public ArrayList<String> apply(ArrayList<String> strings,
                            ArrayList<String> strings2)
                    {
                        return strings;
                    }
                }).stream().forEach( //
                        e -> {
                            strings2.add(e);
                        });

        if (strings2.contains(null))
        {
            System.out.println(strings2);
        }
    }
}

I went through couple of the blogs which says reduce could be used in this case, and the synchronization could be guaranteed, but the case above looks like this is not true. This test is TRUE in couple test runs,

strings2.contains(null)

so my question is that: Is the way in which I use reduce is incorrect, or there is something extra should be done to make sure the sych?

Upvotes: 1

Views: 95

Answers (1)

Malte Hartwig
Malte Hartwig

Reputation: 4555

It looks like filter is better suited to tackle this problem. If you want to use reduce, though, and especially when using it in parallel, you must not modify the accumulator objects (the lists in your case).

From the Oracle tutorial on reduction:

the accumulator function also returns a new value every time it processes an element

When I ran your code, I got two prints of list containing null, followed by an ArrayIndexOutOfBoundsException. The likely reason for this is that two threads tried to add elements to the same list at the same time. The exception occurred after the list was made longer, but before the element was added, hence the null (i.e. empty) slot.

ArrayList<String> strings2 = 
    s1.parallel()
      .reduce(new ArrayList<String>(), 
              (list, el) -> {
                if (el.contains("a")) {
                    ArrayList<String> added = new ArrayList<>(list);
                    added.add(el);
                    return added;
                }
                return list;
              }, 
              (list1, list2) -> {
                ArrayList<String> merged = new ArrayList<>(list1);
                merged.addAll(list2);
                    return merged;
              });

Instead of adding to the list, you have to make a copy of it, add to that copy and return the copy. This way, each thread can work on different parts of the input without interfering with others.

In addition, you cannot just throw out part of the result in the combiner, otherwise you will end up with incomplete results. You have to merge the lists instead of simply returning one of them.

Upvotes: 1

Related Questions