Aleksandr Dubinsky
Aleksandr Dubinsky

Reputation: 23505

Can a Stream be sequentially processed for part of the pipeline, and then as parallel?

I have the following code which does not work as I intended (a random line, instead of the first, is skipped):

Files.lines(path)
     .skip(1)
     .parallel()
     .forEach( System.out::println )

I have a feeling I misunderstood the behavior of Streams. The question is: Can I first treat a stream as sequential (and use "stateful intermediate operations") and then feed it into a parallel forEach?

Upvotes: 7

Views: 796

Answers (4)

aepurniet
aepurniet

Reputation: 1727

No, you cannot do that. However your code should probably work as intended, from the Stream.skip javadocs

While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order. Using an unordered stream source (such as generate(Supplier)) or removing the ordering constraint with BaseStream.unordered() may result in significant speedups of skip() in parallel pipelines, if the semantics of your situation permit. If consistency with encounter order is required, and you are experiencing poor performance or memory utilization with skip() in parallel pipelines, switching to sequential execution with BaseStream.sequential() may improve performance.

Whether your code works or doesnt depends on the nature of the stream returned by Files.lines(..), it depends if that stream is Ordered. These characteristics are set by the Spliterator that is used, if the stream is ordered, then it will always skip the first element. if the stream is unordered, then it will skip one element.

http://download.java.net/jdk8/docs/api/java/util/Spliterator.html

Upvotes: 1

The Coordinator
The Coordinator

Reputation: 13137

It appears that skip(n) does not skip the first n elements on a parallel stream.

Solution, chomp the first [n] lines off using the BufferedReader readLine() method.

Then obtain the Stream which will carry on where you left off with the reader:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.stream.IntStream;

public class TestStreams {

    public static void main(String[] args) throws Exception{
         unordered();
    }

    public static void unordered() throws IOException, InterruptedException {

        StringBuilder sb = new StringBuilder();
        IntStream.range(0, 1000).forEach(n -> sb.append(n).append("\n"));

        try (BufferedReader br = new BufferedReader(new StringReader(sb.toString()))) {
            if (br.readLine() != null) {
                br.lines()
                        .parallel()
                        .forEach(it -> System.out.println(Thread.currentThread() + " : " + it));
            }
        }
    }  
}

Upvotes: 0

x22
x22

Reputation: 541

The entire pipeline is either parallel of sequential.

Try using forEachOrdered instead of forEach. In my test it skips the first line if forEachOrdered is used (with forEach it skips the last line).

forEach ignores encounter order and it seems that is also can make other operations to ignore it.

Upvotes: 2

JB Nizet
JB Nizet

Reputation: 691755

It's not a bug, but a feature. Calling parallel() makes the whole stream parallel. Unless a subsequent call to sequential() is made, which sets the whole stream back to sequential mode.

The javaodoc says:

Returns an equivalent stream that is parallel.

Upvotes: 1

Related Questions