Azbesciak
Azbesciak

Reputation: 580

Generate infinite parallel stream

Problem

Hi, I have a function where i going to return infinite stream of parallel (yes, it is much faster in that case) generated results. So obviously (or not) i used

Stream<Something> stream = Stream.generate(this::myGenerator).parallel()

It works, however ... it doesn't when i want to limit the result (everything is fine when the stream is sequential). I mean, it creates results when i make something like

stream.peek(System.out::println).limit(2).collect(Collectors.toList())

but even when peek output produces more than 10 elements, collect is still not finallized (generating is slow so those 10 can took even a minute)... and that is easy example. Actually, limiting those results is a future due the main expectation is to get only better than recent results until the user will kill the process (other case is to return first what i can make with throwing exception if nothing else will help [findFirst didn't, even when i had more elements on the console and no more results for about 30 sec]).

So, the question is...

how to copy with that? My idea was also to use RxJava, and there is another question - how to achieve similar result with that tool (or other).

Code sample

public Stream<Solution> generateSolutions() {
     final Solution initialSolution = initialSolutionMaker.findSolution();
     return Stream.concat(
          Stream.of(initialSolution),
          Stream.generate(continuousSolutionMaker::findSolution)
    ).parallel();
}

new Solver(instance).generateSolutions()
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .limit(5).collect(Collectors.toList());

Implementation of findSolution is not important. It has some side effect like adding to solutions repo (singleton, sych etc..), but nothing more.

Upvotes: 2

Views: 1662

Answers (2)

Holger
Holger

Reputation: 298123

As explained in the already linked answer, the key point to an efficient parallel stream is to use a stream source already having an intrinsic size instead of using an unsized or even infinite stream and apply a limit on it. Injecting a size doesn’t work with the current implementation at all, while ensuring that a known size doesn’t get lost is much easier. Even if the exact size can’t be retained, like when applying a filter, the size still will be carried as an estimate size.

So instead of

Stream.generate(this::myGenerator).parallel()
      .peek(System.out::println)
      .limit(2)
      .collect(Collectors.toList())

just use

IntStream.range(0, /* limit */ 2).unordered().parallel()
         .mapToObj(unused -> this.myGenerator())
         .peek(System.out::println)
         .collect(Collectors.toList())

Or, closer to your sample code

public Stream<Solution> generateSolutions(int limit) {
    final Solution initialSolution = initialSolutionMaker.findSolution();
    return Stream.concat(
         Stream.of(initialSolution),
         IntStream.range(1, limit).unordered().parallel()
               .mapToObj(unused -> continuousSolutionMaker.findSolution())
   );
}

new Solver(instance).generateSolutions(5)
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .collect(Collectors.toList());

Upvotes: 6

Eugene
Eugene

Reputation: 120848

Unfortunately this is expected behavior. As I remember I've seen at least two topics on this matter, here is one of them.

The idea is that Stream.generate creates an unordered infinite stream and limit will not introduce the SIZED flag. Because of this when you spawn a parallel execution on that Stream, individual tasks have to sync their execution to see if they have reached that limit; by the time that sync happens there could be multiple elements already processed. For example this:

 Stream.iterate(0, x -> x + 1)
            .peek(System.out::println)
            .parallel()
            .limit(2)
            .collect(Collectors.toList());

and this :

IntStream.of(1, 2, 3, 4)
            .peek(System.out::println)
            .parallel()
            .limit(2)
            .boxed()
            .collect(Collectors.toList());

will always generate two elements in the List (Collectors.toList) and will always output two elements also (via peek).

On the other hand this:

Stream<Integer> stream = Stream.generate(new Random()::nextInt).parallel();

List<Integer> list = stream
            .peek(x -> {
                System.out.println("Before " + x);
            })
            .map(x -> {
                System.out.println("Mapping x " + x);
                return x;
            })
            .peek(x -> {
                System.out.println("After " + x);
            })
            .limit(2)
            .collect(Collectors.toList());

will generate two elements in the List, but it may process many more that later will be discarded by the limit. This is what you are actually seeing in your example.

The only sane way of going that (as far as I can tell) would be to create a custom Spliterator. I have not written many of them, but here is my attempt:

 static class LimitingSpliterator<T> implements Spliterator<T> {

    private int limit;

    private final Supplier<T> generator;

    private LimitingSpliterator(Supplier<T> generator, int limit) {
        Preconditions.checkArgument(limit > 0);
        this.limit = limit;
        this.generator = Objects.requireNonNull(generator);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> consumer) {
        if (limit == 0) {
            return false;
        }
        T nextElement = generator.get();
        --limit;
        consumer.accept(nextElement);
        return true;
    }

    @Override
    public LimitingSpliterator<T> trySplit() {

        if (limit <= 1) {
            return null;
        }

        int half = limit >> 1;
        limit = limit - half;
        return new LimitingSpliterator<>(generator, half);
    }

    @Override
    public long estimateSize() {
        return limit >> 1;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}

And the usage would be:

 StreamSupport.stream(new LimitingSpliterator<>(new Random()::nextInt, 7), true)
            .peek(System.out::println)
            .collect(Collectors.toList());

Upvotes: 3

Related Questions