Reputation: 2032
1) How can I use a Supplier (supplier
) to create a sized stream of N values in parallel, while ensuring that no more than N calls are made to the supplier? I need this because I have a supplier with a costly supplier.get()
operation.
2) The 'obvious' answer to my question, Streams.generate(supplier).limit(N)
, does not work and often results in more than N calls being made to the supplier. Why is this?
As 'proof' of the fact that Streams.generate(supplier).limit(N)
results in more than N calls to supplier.get()
, consider the following code:
public class MWE {
static final int N_ELEMENTS=100000;
static Supplier<IntSupplier> mySupplier = () -> new IntSupplier() {
AtomicInteger ai = new AtomicInteger(-1);
@Override
public int getAsInt() {
return ai.incrementAndGet();
}
};
public static void main(String[] args) {
int[] a = IntStream.generate(mySupplier.get()).limit(N_ELEMENTS).toArray();
int[] b = IntStream.generate(mySupplier.get()).parallel().limit(N_ELEMENTS).toArray();
}
}
a
is equal to [0, 1, ..., N_ELEMENTS-1]
as expected, but contrary to what you might expect b
does not contain the same elements as a
. Instead, b
often contains elements that are greater than or equal to N_ELEMENTS
, which indicates more than N_ELEMENTS
number of calls to the supplier.
Another illustration would be that Streams.generate(new Random(0)::nextDouble()).limit(5)
does not always generate the same set of numbers.
Upvotes: 1
Views: 1250
Reputation: 2032
Calling .limit()
is not guaranteed to result in a stream of the first N elements generated by the supplier because Stream.generate()
creates an unordered stream, which leaves limit()
free to decide on what 'part' of the stream to keep. Actually, it is not even semantically sound to refer to "the first N elements" or "(the first) part of the stream", because the stream is unordered. This behavior is clearly laid out in the API documentation; many thanks to everyone who pointed this out to me!
Since asking this question, I have come up with two solutions to my own question. My thanks go to Tagir who set me off in the right direction.
IntStream.range()
A simple and fairly efficient way of creating an unordered, sized, parallel stream backed by a supplier that makes no more calls to the supplier than is absolutely necessary is to (mis)use IntStream.range()
like this:
IntStream.range(0,N_ELEMENTS).parallel().mapToObj($ -> generator.get())
Basically, we are using IntStream.range()
only to create a sized stream that can be processed in parallel.
Because we never actually use the integers inside of the stream created by IntStream.range()
, it seems like we can do slightly better by creating a custom Spliterator:
final class SizedSuppliedSpliterator<T> implements Spliterator<T> {
private int remaining;
private final Supplier<T> supplier;
private SizedSuppliedSpliterator(Supplier<T> supplier, int remaining) {
this.remaining = remaining;
this.supplier = supplier;
}
static <T> SizedSuppliedSpliterator of(Supplier<T> supplier, int limit) {
return new SizedSuppliedSpliterator(supplier, limit);
}
@Override
public boolean tryAdvance(final Consumer<? super T> consumer) {
Objects.requireNonNull(consumer);
if (remaining > 0) {
remaining--;
final T supplied = supplier.get();
consumer.accept(supplied);
return true;
}
return false;
}
@Override
public void forEachRemaining(final Consumer<? super T> consumer) {
while (remaining > 0) {
consumer.accept(supplier.get());
remaining--;
}
}
@Override
public SizedSuppliedSpliterator<T> trySplit() {
int split = (int)remaining/2;
remaining -= split;
return new SizedSuppliedSpliterator<>(supplier, split);
}
@Override
public long estimateSize() {
return remaining;
}
@Override
public int characteristics() {
return SIZED | SUBSIZED | IMMUTABLE;
}
}
We can use this spliterator to create the stream as follows:
StreamSupport.stream(SizedSuppliedSpliterator.of(supplier, N_ELEMENTS), true)
Of course, computing a couple of integers is hardly expensive, and I have not been able to notice or even measure any improvement in performance over solution 1.
Upvotes: 1
Reputation: 100169
The stream API does not guarantee that IntStream.generate()
will call the generator specified number of times. Also this call does not respect ordering.
If you actually need a parallel stream of increasing numbers, it's much better to use IntStream.range(0, N_ELEMENTS).parallel()
. This not only ensures that you will actually have all the numbers from 0
to N_ELEMENTS-1
, but greatly reduces the contention and guarantees order. If you need to generate something more complex, consider using custom source defining your own Spliterator class.
Note that the proposed IntStream.iterate
solution may not parallelize greatly as it's sequential-by-nature source.
Upvotes: 4