Reputation: 315
I recently discovered a bug in which
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.limit(20)
was invoking Spliterator.ofInt.tryAdvance
more than 20 times. When I changed it to
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.sequential()
.limit(20)
the problem went away. Why does this happen? Is there any way to achieve a strict limit on a parallel stream when tryAdvance
has side effects, other than by building one into the Spliterator
? (This is for testing some methods that return unlimited streams, but where the tests need to reach an eventual end without the complication of a "loop for X milliseconds" construction.)
Upvotes: 4
Views: 1370
Reputation: 315
For my use case, the solution was to use:
LongStream.range(0, streamSize).unordered().parallel().mapToInt(ignored -> nextInt())
NB: This was for a stream of random numbers from a PRNG that might be continuously reseeded.
Upvotes: 0
Reputation: 298599
There seems to be a fundamental misunderstanding about how limit
and trySplit
should interact. The assumption that there should be no more trySplit
invocations than the specified limit
, is completely wrong.
The purpose of trySplit
is to divide the source data into two parts, into two halves in the best case, as trySplit
is supposed to attempt a balanced split. So if you have a source data set of one million elements, a successful split yields two source data sets of half a million elements each. This is entirely unrelated to a limit(20)
you might have applied to the stream, except, that we know beforehand, that we can drop the second data set, if the spliterator has the SIZED|SUBSIZED
characteristics, as the requested first twenty elements can only be found within the first half million.
It’s easy to calculate that in the best case, i.e. with balanced splits, we need fifteen split operations already, dropping the upper half each time, before we ever get a split in-between the first twenty elements that allows us to process these first twenty elements in parallel.
Which can be easily demonstrated:
class DebugSpliterator extends Spliterators.AbstractIntSpliterator {
int current, fence;
DebugSpliterator() {
this(0, 1_000_000);
}
DebugSpliterator(int start, int end) {
super(end-start, ORDERED|SIZED|SUBSIZED);
current = start;
fence = end;
}
@Override public boolean tryAdvance(IntConsumer action) {
if(current<fence) {
action.accept(current++);
return true;
}
return false;
}
@Override public OfInt trySplit() {
int mid = (current+fence)>>>1;
System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");
return mid>current? new DebugSpliterator(current, current=mid): null;
}
}
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});
On my machine, it prints:
trySplit() [0, 500000, 1000000]
trySplit() [0, 250000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [0, 62500, 125000]
trySplit() [0, 31250, 62500]
trySplit() [0, 15625, 31250]
trySplit() [0, 7812, 15625]
trySplit() [0, 3906, 7812]
trySplit() [0, 1953, 3906]
trySplit() [0, 976, 1953]
trySplit() [0, 488, 976]
trySplit() [0, 244, 488]
trySplit() [0, 122, 244]
trySplit() [0, 61, 122]
trySplit() [0, 30, 61]
trySplit() [0, 15, 30]
trySplit() [15, 22, 30]
trySplit() [15, 18, 22]
trySplit() [15, 16, 18]
trySplit() [16, 17, 18]
trySplit() [0, 7, 15]
trySplit() [18, 20, 22]
trySplit() [18, 19, 20]
trySplit() [7, 11, 15]
trySplit() [0, 3, 7]
trySplit() [3, 5, 7]
trySplit() [3, 4, 5]
trySplit() [7, 9, 11]
trySplit() [4, 4, 5]
trySplit() [9, 10, 11]
trySplit() [11, 13, 15]
trySplit() [0, 1, 3]
trySplit() [13, 14, 15]
trySplit() [7, 8, 9]
trySplit() [1, 2, 3]
trySplit() [8, 8, 9]
trySplit() [5, 6, 7]
trySplit() [14, 14, 15]
trySplit() [17, 17, 18]
trySplit() [11, 12, 13]
trySplit() [12, 12, 13]
trySplit() [2, 2, 3]
trySplit() [10, 10, 11]
trySplit() [6, 6, 7]
which is, of course, far more than twenty split attempts, but entirely reasonable, as the data set has to be split down until we have sub-ranges within the desired target range to be able to process it in parallel.
We can enforce a different behavior by dropping the meta information that leads to this execution strategy:
StreamSupport.stream(new DebugSpliterator(), true)
.filter(x -> true)
.limit(20)
.forEach(x -> {});
Since the Stream API has no knowledge about the predicate’s behavior, the pipeline looses its SIZED
characteristic, leading to
trySplit() [0, 500000, 1000000]
trySplit() [500000, 750000, 1000000]
trySplit() [500000, 625000, 750000]
trySplit() [625000, 687500, 750000]
trySplit() [625000, 656250, 687500]
trySplit() [656250, 671875, 687500]
trySplit() [0, 250000, 500000]
trySplit() [750000, 875000, 1000000]
trySplit() [250000, 375000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [250000, 312500, 375000]
trySplit() [312500, 343750, 375000]
trySplit() [125000, 187500, 250000]
trySplit() [875000, 937500, 1000000]
trySplit() [375000, 437500, 500000]
trySplit() [125000, 156250, 187500]
trySplit() [250000, 281250, 312500]
trySplit() [750000, 812500, 875000]
trySplit() [281250, 296875, 312500]
trySplit() [156250, 171875, 187500]
trySplit() [437500, 468750, 500000]
trySplit() [0, 62500, 125000]
trySplit() [875000, 906250, 937500]
trySplit() [62500, 93750, 125000]
trySplit() [812500, 843750, 875000]
trySplit() [906250, 921875, 937500]
trySplit() [0, 31250, 62500]
trySplit() [31250, 46875, 62500]
trySplit() [46875, 54687, 62500]
trySplit() [54687, 58593, 62500]
trySplit() [58593, 60546, 62500]
trySplit() [60546, 61523, 62500]
trySplit() [61523, 62011, 62500]
trySplit() [62011, 62255, 62500]
which shows less trySplit
invocations, but not an improvement; looking at the numbers reveals that now ranges outside the resulting element range (if we use our knowledge that all elements will pass the filer) are processed, even worse, the range of resulting elements is entirely covered by a single spliterator, resulting in no parallel processing for our result elements at all, all other threads were processing elements that got dropped afterwards.
Of course, we could easily enforce an optimal splitting for our task by changing
int mid = (current+fence)>>>1;
to
int mid = fence>20? 20: (current+fence)>>>1;
so
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});
results in
trySplit() [0, 20, 1000000]
trySplit() [0, 10, 20]
trySplit() [10, 15, 20]
trySplit() [10, 12, 15]
trySplit() [12, 13, 15]
trySplit() [0, 5, 10]
trySplit() [15, 17, 20]
trySplit() [5, 7, 10]
trySplit() [0, 2, 5]
trySplit() [17, 18, 20]
trySplit() [2, 3, 5]
trySplit() [5, 6, 7]
trySplit() [15, 16, 17]
trySplit() [6, 6, 7]
trySplit() [16, 16, 17]
trySplit() [0, 1, 2]
trySplit() [7, 8, 10]
trySplit() [8, 9, 10]
trySplit() [1, 1, 2]
trySplit() [3, 4, 5]
trySplit() [9, 9, 10]
trySplit() [18, 19, 20]
trySplit() [10, 11, 12]
trySplit() [13, 14, 15]
trySplit() [11, 11, 12]
trySplit() [4, 4, 5]
trySplit() [14, 14, 15]
but that wouldn’t be a general purpose spliterator, but one that performs poorly if the limit is not twenty.
If we can incorporate the limit into the spliterator or, more generally, into the stream source, we don’t have that problem. So instead of list.stream().limit(x)
, you might call list.subList(0, Math.min(x, list.size())).stream()
, instead of random.ints().limit(x)
, use random.ints(x)
, instead of Stream.generate(generator).limit(x)
you may use LongStream.range(0, x).mapToObj( index -> generator.get())
or use the factory method of this answer.
For an arbitrary stream source/ spliterator, applying limit
can be quiet expensive for parallel streams, which is even documented. Well, and having side effects in trySplit
is a bad idea in the first place.
Upvotes: 5
Reputation: 121088
I don't think that is a bug in any way, but still a very interesting idea that tryAdvance
can have side-effects.
That would be entirely possible as far as I understand for the case when your trySplit
splits not to a single element batches.
For example you have an array and you want to split it (via trySplit
) into portions of sub-arrays not less than 4 elements in each. In such a case when you can't split anymore (you've reached a minimum of 4 elements in the current Spliterator
for example), when processing will start - forEachRemaning
will be called; in turn it will default to calling tryAdvance
for each element in the current Spliterator
, as seen in the default implementation:
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}
Obviously since you are doing work in parallel - once a Thread has started it's work (read executing it's forEachRemaning
), it can't be stopped anymore - so many more elements will hit tryAdvance
.
As such, I really don't think there is a way to do this other than integrating this into the Spliterator
itself; I think this should work:
static class LimitingSpliterator<T> implements Spliterator<T> {
private int limit;
private final Supplier<T> generator;
private LimitingSpliterator(Supplier<T> generator, int limit) {
this.limit = limit;
this.generator = generator;
}
static <T> LimitingSpliterator<T> of(Supplier<T> supplier, int limit) {
return new LimitingSpliterator<>(supplier, limit);
}
@Override
public boolean tryAdvance(final Consumer<? super T> consumer) {
Objects.requireNonNull(consumer);
if (limit > 0) {
--limit;
generator.get();
consumer.accept(generator.get());
return true;
}
return false;
}
@Override
public void forEachRemaining(final Consumer<? super T> consumer) {
while (limit > 0) {
consumer.accept(generator.get());
--limit;
}
}
@Override
public LimitingSpliterator<T> trySplit() {
int half = limit >> 2;
limit = limit - half;
return new LimitingSpliterator<>(generator, half);
}
@Override
public long estimateSize() {
return limit << 2;
}
@Override
public int characteristics() {
return SIZED;
}
}
Upvotes: 2