Reputation: 23505
I have the following code which does not work as I intended (a random line, instead of the first, is skipped):
Files.lines(path)
.skip(1)
.parallel()
.forEach( System.out::println )
I have a feeling I misunderstood the behavior of Streams. The question is: Can I first treat a stream as sequential (and use "stateful intermediate operations") and then feed it into a parallel forEach
?
Upvotes: 7
Views: 796
Reputation: 1727
No, you cannot do that. However your code should probably work as intended, from the Stream.skip
javadocs
While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order. Using an unordered stream source (such as generate(Supplier)) or removing the ordering constraint with BaseStream.unordered() may result in significant speedups of skip() in parallel pipelines, if the semantics of your situation permit. If consistency with encounter order is required, and you are experiencing poor performance or memory utilization with skip() in parallel pipelines, switching to sequential execution with BaseStream.sequential() may improve performance.
Whether your code works or doesnt depends on the nature of the stream returned by Files.lines(..)
, it depends if that stream is Ordered
. These characteristics are set by the Spliterator that is used, if the stream is ordered, then it will always skip the first element. if the stream is unordered, then it will skip one element.
http://download.java.net/jdk8/docs/api/java/util/Spliterator.html
Upvotes: 1
Reputation: 13137
It appears that skip(n) does not skip the first n elements on a parallel stream.
Solution, chomp the first [n] lines off using the BufferedReader readLine() method.
Then obtain the Stream which will carry on where you left off with the reader:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.stream.IntStream;
public class TestStreams {
public static void main(String[] args) throws Exception{
unordered();
}
public static void unordered() throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
IntStream.range(0, 1000).forEach(n -> sb.append(n).append("\n"));
try (BufferedReader br = new BufferedReader(new StringReader(sb.toString()))) {
if (br.readLine() != null) {
br.lines()
.parallel()
.forEach(it -> System.out.println(Thread.currentThread() + " : " + it));
}
}
}
}
Upvotes: 0
Reputation: 541
The entire pipeline is either parallel of sequential.
Try using forEachOrdered
instead of forEach
. In my test it skips the first line if forEachOrdered
is used (with forEach
it skips the last line).
forEach
ignores encounter order and it seems that is also can make other operations to ignore it.
Upvotes: 2
Reputation: 691755
It's not a bug, but a feature. Calling parallel()
makes the whole stream parallel. Unless a subsequent call to sequential()
is made, which sets the whole stream back to sequential mode.
The javaodoc says:
Returns an equivalent stream that is parallel.
Upvotes: 1