Reputation: 11496
I'm wondering if stream operations in a stream pipeline of a parallel stream wait for the previous stream operation to have completed processing of all stream elements.
If I have the following stream pipeline:
List <String> parsedNumbers = IntStream.range(1, 6)
.parallel()
.map(String::valueOf)
.map(integerAsString => {
System.out.println("First print statement: " + integerAsString);
return integerAsString;
})
.map(integerAsString => {
System.out.println("Second print statement: " + integerAsString);
return integerAsString;
})
.collect(Collectors.toList());
Could it happen that System.out.println("First print statement: " + integerAsString)
is already being called for element X, but the String::parseInt
operation is still being carried out for another element Y in the stream?
Could the output of this code be as follows:
First print statement : 1
First print statement : 2
First print statement : 3
Second print statement : 1
Second print statement : 2
First print statement : 4
Second print statement : 3
Second print statement : 4
First print statement : 5
Second print statement : 5
Of will it always be like this:
First print statement : 1
First print statement : 2
First print statement : 3
First print statement : 4
First print statement : 5
Second print statement : 1
Second print statement : 2
Second print statement : 3
Second print statement : 4
Second print statement : 5
Upvotes: 3
Views: 934
Reputation: 298559
The processing order is not guaranteed, not even for sequential streams. Only the end result will be consistent with the encounter order, if the data had one.
When you run the following sequential code
List<String> parsedNumbers = IntStream.range(1, 6)
.mapToObj(String::valueOf)
.map(integerAsString -> {
System.out.println("First print statement: " + integerAsString);
return integerAsString;
})
.map(integerAsString -> {
System.out.println("Second print statement: " + integerAsString);
return integerAsString;
})
.collect(Collectors.toList());
it will print
First print statement: 1
Second print statement: 1
First print statement: 2
Second print statement: 2
First print statement: 3
Second print statement: 3
First print statement: 4
Second print statement: 4
First print statement: 5
Second print statement: 5
showing that streams do not work like you expect. The reference implementation has a clear preference towards passing each element through the entire stream before processing the next one. When you enable parallel processing, the same processing logic will be performed on each CPU core.
So when I use
List<String> parsedNumbers = IntStream.range(1, 6)
.parallel()
.mapToObj(String::valueOf)
.map(integerAsString -> {
System.out.println("First print statement: " + integerAsString);
return integerAsString;
})
.map(integerAsString -> {
System.out.println("Second print statement: " + integerAsString);
return integerAsString;
})
.collect(Collectors.toList());
I get something like this on my machine:
First print statement: 5
First print statement: 2
First print statement: 1
First print statement: 4
First print statement: 3
Second print statement: 5
Second print statement: 2
Second print statement: 1
Second print statement: 4
Second print statement: 3
which may look like having processed the first print statement as a stage before the second, but that’s just a coincidence of have more CPU cores than stream elements and a lucky timing. E.g., when I change range(1, 6)
to range(1, 18)
, I get something like
First print statement: 6
First print statement: 10
First print statement: 9
First print statement: 3
First print statement: 15
First print statement: 5
Second print statement: 9
First print statement: 11
First print statement: 8
Second print statement: 3
Second print statement: 11
Second print statement: 5
Second print statement: 10
Second print statement: 6
First print statement: 7
First print statement: 12
Second print statement: 8
Second print statement: 15
Second print statement: 12
Second print statement: 7
First print statement: 2
First print statement: 17
First print statement: 14
First print statement: 4
Second print statement: 14
Second print statement: 17
Second print statement: 2
First print statement: 1
First print statement: 16
First print statement: 13
Second print statement: 16
Second print statement: 1
Second print statement: 4
Second print statement: 13
Not only are there no guarantees about the order of the processing, there are also no guarantees about which elements will be processed, e.g.
IntStream.range(1, 30)
.filter(i -> i%13 == 1)
.peek(i -> System.out.println("processing "+i))
.parallel()
.findFirst()
.ifPresent(i -> System.out.println("result is "+i));
produces in my setup
processing 14
processing 1
processing 27
result is 1
So while the result is guaranteed to be 1
, the first matching element in encounter order, there is no guaranty that other elements, following it in encounter order, are not processed.
Upvotes: 4
Reputation: 121048
Yes it could. Intermediate
stages can be executed in any order, terminal
operations have a defined order, if the source for the stream has an order ( unlike a Set
for example ) and the stream itself does not alter that order ( calling unordered
- though at the moment this does not do much).
That is: you don't really know what element will flow through one of the stages at the given point in time, there is no order of how elements are processed for a parallel stream.
The bigger question is why do you care? Intermediate operations are supposed to be side-effect free and relying on any order is a bad idea.
Upvotes: 4