Adam Shem-Ur
Adam Shem-Ur

Reputation: 300

How to use streams with interfering methods and constructors, and why not use .peek()?

I'll show a simplified example of my problem to help demonstrate it.

let's say I'm trying to convert an old java code to java 8 code for style and parallelization purposes (Which i do).

this is the code:

    public static boolean deleteTester(List<Integer> keys) {
        DHeap dHeap = new DHeap(d, keys.size());
        DHeap_Item[] DHeap_ItemArray = keysListToDHeap_ItemArray(keys);
        dHeap.arrayToHeap(DHeap_ItemArray);

        for (int i = 0; i < keys.size(); i++) {
            keys.set(i, null);
            dHeap.delete(DHeap_ItemArray[i]);
            if (!someTest(keys, dHeap.getList()))
                return false;
        }
        return true;
    }

There are several issues in the conversion:

  1. I need to check every iteration, so i can't reduce/collect and then check.
  2. I am accessing an array and a data structure object (which the tester is for), meaning it isn't non-interfering, and isn't stateless operation.

Here is my attempt at replacing the for loop:

    return IntStream.range(0, keys.size())
            //.parallel()
            .peek(idx -> keys.set(idx, null))
            .peek(idx -> dHeap.delete(DHeap_ItemArray[idx]))
            .allMatch(e -> someTest(keys, dHeap.getList()));

which is much shorter and readable, however "breaks the rules" and in result unable to parallel.

So my questions are:

  1. Why am I unable to use .parallal()? (I get a null pointer ex.)
  2. Why shouldn't we use peek()? because in this code i see no other way around it.
  3. What should I do when accessing outside elements?
  4. How should I "transfer" the code to java 8 streams?
  5. What am I missing?

I guess that the answers are intertwined. I just started to learn java 8 so thanks a lot for all the help because I'm really stuck.

Upvotes: 4

Views: 173

Answers (2)

Misha
Misha

Reputation: 28133

You absolutely should not use peek like this. Streams API documentation does not specify that elements will pass through peek only until the test in allMatch fails. All bets are off the the stream is parallel as anyMatch is not required to be evaluated in order and one thread is allowed to keep executing peek even after another thread has encountered a condition that violates anyMatch.

Even if the stream is sequential, peek can execute an unexpected number of times in certain cases. Consider the following code:

List<List<Integer>> data = Arrays.asList(
    Arrays.asList(1,2), 
    Arrays.asList(3,4,5), 
    Arrays.asList(6,7));

data.stream()
    .flatMap(List::stream)
    .peek(System.out::println)
    .allMatch(x -> x < 4);

You might think that it will print numbers up to 3, but in fact it will print numbers up to 5.

Upvotes: 1

holi-java
holi-java

Reputation: 30686

Q: Why am I unable to use .parallal()?

Q: What should I do when accessing outside elements?

you can use it in a parallel stream, there is no problem when the action doesn't modify shared state there, for example:

stream.peek(System.out::println).allMatch(...);

If the action modifies shared state, it is responsible for providing the required synchronization, for example:

stream.peek(it->{synchronized(lock){ keys.set(idx, null); }}).allMatch(...);

Q: Why shouldn't we use peek()?

you can using the peek() method, but you should avoiding using peek() if you can't control the stream, for example:

boolean foo(){ stream.peek(...).allMatch(..); }

//a violate rule
Stream<?> foo(){
  /*you need avoiding using peek() here since you can't control the stream*/
}

Q: How should I "transfer" the code to java 8 streams?

there is a detailed description how to operates a stream in its package summary java.util.stream.

Upvotes: 2

Related Questions