Fan
Fan

Reputation: 315

Collect results from parallel stream

I have a piece of code like this:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    ArrayList<Egg> eggs = new ArrayList<>();
    while (hen.hasEgg()) {
        eggs.add(hen.getEgg());
    }
    return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());

But in this way I have to create an ArrayList for every hen, and eggs are not collected until a hen is 100% processed. I would like something like this:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    while (hen.hasEgg()) {
        yield return hen.getEgg();
    }
}).collect(Collectors.toList());

But Java does not have yield return. Is there a way to implement it?

Upvotes: 4

Views: 6792

Answers (3)

Holger
Holger

Reputation: 298123

The iteration logic using hasEgg() and getEgg() is stateful as these method’s results depend on the previous invocations. Therefore, processing a single Hen can’t be parallelized unless you manage to change the interface completely.

That said, your worrying about the ArrayList is unnecessary. When the stream implementation executes the collect operation in parallel, it has to buffer the values for each thread anyway and combine these buffers afterwards. It might even be the case that the operation doesn’t benefit from parallel execution at all.

What you can do, is to replace the ArrayList by a Stream.Builder as it’s optimized for the use case of only adding until constructing the Stream:

List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
    Stream.Builder<Egg> eggStream = Stream.builder();
    while(hen.hasEgg()) {
        eggStream.add(hen.getEgg());
    }
    return eggStream.build();
}).collect(Collectors.toList());

Upvotes: 2

pens-fan-69
pens-fan-69

Reputation: 1039

Assuming the existence of a getEggs() method, you can use the following to collect all of the eggs.

List<Egg> eggs = hens.parallelStream()
    .filter(Hen::hasEggs)
    .map(Hen::getEggs)
    .collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);

The code assumes that getEggs() returns a Collection. You could eliminate the filter(Hen::hasEggs) if getEggs() returns an empty Collection when the Hen has no Eggs.

Upvotes: 1

Tagir Valeev
Tagir Valeev

Reputation: 100169

Your Hen class is poorly adapted to the Stream API. Provided that you cannot change it and it has no other useful methods (like Collection<Egg> getAllEggs() or Iterator<Egg> eggIterator()), you can create an egg stream like this:

public static Stream<Egg> eggs(Hen hen) {
    Iterator<Egg> it = new Iterator<Egg>() {
        @Override
        public boolean hasNext() {
            return hen.hasEgg();
        }

        @Override
        public Egg next() {
            return hen.getEgg();
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}

Now you can use it in the following manner:

List<Egg> eggs = hens.parallelStream()
                     .flatMap(hen -> eggs(hen))
                     .collect(Collectors.toList());

Of course better Stream implementation might be possible if you can change the Hen class.

Upvotes: 8

Related Questions