Semafoor
Semafoor

Reputation: 2032

Limiting infinite parallel stream

1) How can I use a Supplier (supplier) to create a sized stream of N values in parallel, while ensuring that no more than N calls are made to the supplier? I need this because I have a supplier with a costly supplier.get() operation.

2) The 'obvious' answer to my question, Streams.generate(supplier).limit(N), does not work and often results in more than N calls being made to the supplier. Why is this?

As 'proof' of the fact that Streams.generate(supplier).limit(N) results in more than N calls to supplier.get(), consider the following code:

public class MWE {
    static final int N_ELEMENTS=100000;
    static Supplier<IntSupplier> mySupplier = () -> new IntSupplier() {
        AtomicInteger ai = new AtomicInteger(-1);
        @Override
        public int getAsInt() {
            return ai.incrementAndGet();
        }
    };
    public static void main(String[] args) {
        int[] a = IntStream.generate(mySupplier.get()).limit(N_ELEMENTS).toArray();
        int[] b = IntStream.generate(mySupplier.get()).parallel().limit(N_ELEMENTS).toArray();
    }
}

a is equal to [0, 1, ..., N_ELEMENTS-1] as expected, but contrary to what you might expect b does not contain the same elements as a. Instead, b often contains elements that are greater than or equal to N_ELEMENTS, which indicates more than N_ELEMENTS number of calls to the supplier.

Another illustration would be that Streams.generate(new Random(0)::nextDouble()).limit(5) does not always generate the same set of numbers.

Upvotes: 1

Views: 1250

Answers (2)

Semafoor
Semafoor

Reputation: 2032

Calling .limit() is not guaranteed to result in a stream of the first N elements generated by the supplier because Stream.generate() creates an unordered stream, which leaves limit() free to decide on what 'part' of the stream to keep. Actually, it is not even semantically sound to refer to "the first N elements" or "(the first) part of the stream", because the stream is unordered. This behavior is clearly laid out in the API documentation; many thanks to everyone who pointed this out to me!

Since asking this question, I have come up with two solutions to my own question. My thanks go to Tagir who set me off in the right direction.

Solution 1: Misusing IntStream.range()

A simple and fairly efficient way of creating an unordered, sized, parallel stream backed by a supplier that makes no more calls to the supplier than is absolutely necessary is to (mis)use IntStream.range() like this:

IntStream.range(0,N_ELEMENTS).parallel().mapToObj($ -> generator.get())

Basically, we are using IntStream.range() only to create a sized stream that can be processed in parallel.

Solution 2: Custom spliterator

Because we never actually use the integers inside of the stream created by IntStream.range(), it seems like we can do slightly better by creating a custom Spliterator:

final class SizedSuppliedSpliterator<T> implements Spliterator<T> {
    private int remaining;

    private final Supplier<T> supplier;

    private SizedSuppliedSpliterator(Supplier<T> supplier, int remaining) {
        this.remaining = remaining;
        this.supplier = supplier;
    }

    static <T> SizedSuppliedSpliterator of(Supplier<T> supplier, int limit) {
        return new SizedSuppliedSpliterator(supplier, limit);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer);
        if (remaining > 0) {
            remaining--;
            final T supplied = supplier.get();
            consumer.accept(supplied);
            return true;
        }
        return false;
    }

    @Override
    public void forEachRemaining(final Consumer<? super T> consumer) {
        while (remaining > 0) {
            consumer.accept(supplier.get());
            remaining--;
        }
    }

    @Override
    public SizedSuppliedSpliterator<T> trySplit() {
        int split = (int)remaining/2;
        remaining -= split;
        return new SizedSuppliedSpliterator<>(supplier, split);
    }

    @Override
    public long estimateSize() {
        return remaining;
    }

    @Override
    public int characteristics() {
        return SIZED | SUBSIZED | IMMUTABLE;
    }
}

We can use this spliterator to create the stream as follows:

StreamSupport.stream(SizedSuppliedSpliterator.of(supplier, N_ELEMENTS), true)

Of course, computing a couple of integers is hardly expensive, and I have not been able to notice or even measure any improvement in performance over solution 1.

Upvotes: 1

Tagir Valeev
Tagir Valeev

Reputation: 100169

The stream API does not guarantee that IntStream.generate() will call the generator specified number of times. Also this call does not respect ordering.

If you actually need a parallel stream of increasing numbers, it's much better to use IntStream.range(0, N_ELEMENTS).parallel(). This not only ensures that you will actually have all the numbers from 0 to N_ELEMENTS-1, but greatly reduces the contention and guarantees order. If you need to generate something more complex, consider using custom source defining your own Spliterator class.

Note that the proposed IntStream.iterate solution may not parallelize greatly as it's sequential-by-nature source.

Upvotes: 4

Related Questions