Reputation: 120848
Suppose I have this custom collector :
public class CustomToListCollector<T> implements Collector<T, List<T>, List<T>> {
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
@Override
public BinaryOperator<List<T>> combiner() {
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
}
This is exactly the Collectors#toList implementation with one minor difference: there's also UNORDERED characteristics added.
I would assume that running this code :
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
for (int i = 0; i < 100_000; i++) {
List<Integer> result = list.parallelStream().collect(new CustomToListCollector<>());
if (!result.equals(list)) {
System.out.println(result);
break;
}
}
should actually produce some result. But it does not.
I've looked under the hood a bit. ReferencePipeline#collect first checks if the stream is parallel, if the collector is concurrent and if the collector is unordered. Concurrent is missing, so it delegates to a method evaluate by creating a TerminalOp out of this collector. This under the hood is a ReducingSink, that actually cares if the collector is unordered or not:
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
I have not debugged further since it gets pretty complicated fast.
Thus may be there is a shortcut here and someone could explain what I am missing. It is a parallel stream that collects elements in a non-concurrent unordered collector. Shouldn't there be no order in how the threads combine the results together? If not, how is the order imposed here (by whom)?
Upvotes: 6
Views: 659
Reputation: 298123
Note that the result is the same when using list .parallelStream() .unordered() .collect(Collectors.toList())
, in either case, the unordered property is not used within the current implementation.
But let’s change the setup a little bit:
List<Integer> list = Collections.nCopies(10, null).stream()
.flatMap(ig -> IntStream.range(0, 100).boxed())
.collect(Collectors.toList());
List<Integer> reference = new ArrayList<>(new LinkedHashSet<>(list));
for (int i = 0; i < 100_000; i++) {
List<Integer> result = list.parallelStream()
.distinct()
.collect(characteristics(Collectors.toList(), Collector.Characteristics.UNORDERED));
if (!result.equals(reference)) {
System.out.println(result);
break;
}
}
using the characteristics
collector factory of this answer
The interesting outcome is that in Java 8 versions prior to 1.8.0_60
, this has a different outcome. If we use objects with distinct identities instead of the canonical Integer
instance, we could detect that in these earlier versions, not only the order of the list differs, but that the objects in the result list are not the first encountered instances.
So the unordered characteristic of a terminal operation was propagated to the stream, affecting the behavior of distinct()
, similar to that of skip
and limit
, as discussed here and here.
As discussed in the second linked thread, the back-propagation has been removed completely, which is reasonable when thinking about it a second time. For distinct
, skip
and limit
, the order of the source is relevant and ignoring it just because the order will be ignored in subsequent stages is not right. So the only remaining stateful intermediate operation that could benefit from back-propagation would be sorted
, which would be rendered obsolete when the order is being ignored afterwards. But combining sorted
with an unordered sink is more like a programming error anyway…
For stateless intermediate operations the order is irrelevant anyway. The stream processing works by splitting the source into chunks, apply all stateless intermediate operations on their elements independently and collecting into a local container, before merging into the result container. So the merging step is the only place, where respecting or ignoring the order (of the chunks) will have an impact on the result and perhaps on the performance.
But the impact isn’t very big. When you implement such an operation, e.g. via ForkJoinTask
s, you simply split a task into two, wait for their completion and merge them. Alternatively, a task may split off a chunk into a sub-task, process its remaining chunk in-place, wait for the sub-task and merge. In either case, merging the results in order comes naturally due to the fact that the initiating task has hands on references to the adjacent tasks. To merge with different chunks instead, the associated sub-tasks first have to be found somehow.
The only benefit from merging with a different task would be that you can merge with the first completed task, if the tasks need different time to complete. But when waiting for a sub-task in the Fork/Join framework, the thread won’t be idle, the framework will use the thread for working on other pending tasks in-between. So as long as the main task has been split into enough sub-tasks, there will be full CPU utilization. Also, the spliterators attempt to split into even chunks to reduce the differences between the computing times. It’s very likely that the benefit of an alternative unordered merging implementation doesn’t justify the code duplication, at least with the current implementation.
Still, reporting an unordered characteristic allows the implementation to utilize it when beneficial and implementations can change.
Upvotes: 7
Reputation: 120848
This is not an actual answer per-se, but if I add more code and comments, it will get too many I guess.
Here is another interesting thing, actually it made me realize I was wrong in comments.
A spliterator flags need to be merged with all the terminal operation flags and intermediate ones.
Our spliterator's flags are (as reported by StreamOpFlags) : 95; this can be debugged from AbstractSpliterator#sourceSpliterator(int terminalFlags).
That is why the line below reports true:
System.out.println(StreamOpFlag.ORDERED.isKnown(95)); // true
At the same time our terminal collector's characteristics are 32:
System.out.println(StreamOpFlag.ORDERED.isKnown(32)); // false
The result:
int result = StreamOpFlag.combineOpFlags(32, 95); // 111
System.out.println(StreamOpFlag.ORDERED.isKnown(result)); // false
If you think about this, it makes complete sense. List has order, my custom collector does not => order is not preserved.
Bottom-line: that UNORDERED flag is preserved in the resulting Stream, but internally nothing is done with it. They could probably, but they choose not to.
Upvotes: 2