Reputation: 11040
In my program, I repeatedly1 collect Java 8 streams to reduce a collection of objects to a single one. The size of this collection can vary a lot throughout the execution: from 3 objects to hundreds.
public void findInterestingFoo(Stream<Foo> foos) {
internalState.update(foos.collect(customCollector()));
}
In the process of optimizing my code and searching for bottlenecks, I made the stream parallel at some point. This worked at that point in time, as the collections were all fairly big. Later, after changing other parts and parameters of the program, the collections became smaller. I realized that not making the stream parallel was more efficient. This makes sense: the overhead of distributing work over multiple threads for 4 objects is simply not worth it. It is worth it for hundreds of objects though.
It would be really convenient if I could make only big streams parallel:
public void findInterestingFoo(Stream<Foo> foos) {
if (isSmall(foos)) {
internalState.update(foos.collect(customCollector()));
} else {
internalState.update(foos.parallel().collect(customCollector()));
}
}
Of course, this is possible to do manually when the stream is created from an array, a collection, or manually. That is, we know what elements go in the stream, so this can be tracked. I am however interested in solving this in a generic way, so that no matter what kind of stream is passed to findInterestingFoo
, it is handled appropriately and as efficiently as possible.
Something like count()
might help, except it terminates the stream before I can collect it.
I am well aware that streams are designed to not have a set size, in particular:
- Possibly unbounded. While collections have a finite size, streams need not. Short-circuiting operations such as
limit(n)
orfindFirst()
can allow computations on infinite streams to complete in finite time. —java.util.stream
package description
Still, I wonder if there is any way to determine how many elements are in a stream before performing any operations on it. Does a stream really not know that it is created from a finite collection?
__________
1 Thousands of times. Optimizing this led to a speedup from about 1.5 to 0.5 seconds total running time in my case.
Upvotes: 13
Views: 1498
Reputation: 298143
In theory, you could do something like this:
public void findInterestingFoo(Stream<Foo> foos) {
Spliterator<Foo> sp = foos.spliterator();
long size = sp.getExactSizeIfKnown();// returns -1 if not known
// or sp.estimateSize(); // Long.MAX_VALUE means "unknown"
internalState.update(
StreamSupport.stream(sp, size > PARALLEL_THRESHOLD)
.collect(customCollector()));
}
spliterator()
is a terminal operation that consumes the input stream, but you can pass the Spliterator
to StreamSupport.stream
to construct a stream with exactly the same properties. The second parameter already tells whether the stream should be parallel.
In theory.
In practice, the current stream implementation will return different Spliterator
implementations depending on whether the stream is parallel or not. This implies that recreating the stream as a parallel stream may end up with a stream that is incapable of doing parallel processing when the original stream wasn’t already parallel before calling spliterator()
.
It does work well, however, if there are no intermediate operations, e.g. when you directly pass in the Stream
created from a collection or array.
Calling parallel()
before spliterator()
to get a parallel capable stream that may still run sequentially if you decide to do so, works in a lot of cases. However, if there are stateful intermediate operations like sorted()
in the input stream, they may get fixed to run in parallel then, even if you do the collect
sequentially (or vice versa).
Another problem is of a fundamental nature. The number of elements doesn’t actually say whether there will be a benefit in parallel processing or not. This does depend on the per-element workload, which does not only depend on your terminal collect
operation but also on the operations already chained to the stream before entering your method. Even if you conclude that your collector’s workload is already high enough to deserve parallel processing, it might be that the incoming stream has operations like skip
, limit
or distinct
(on an ordered stream), which often run worse in parallel and would require an entirely different threshold.
A simpler solution is to let the caller decide, as the caller knows something about the size and nature of the stream. You don’t even need to add an option to your method’s signature as the caller can already make the decision by calling parallel()
or sequential()
on the stream before passing it to your method, and you can respect that by simply not changing the mode.
Upvotes: 17