Reputation: 5642
I have a question about how Java Streams and chained CompletableFutures perform.
My question is this: if I run the following code, calling execute()
with 10 items in the list takes ~11 seconds to complete (number of items in the list plus 1). This is because I have two threads working in parallel: the first executes the digItUp
operation, and once that's complete, the second executes the fillItBackIn
operation, and the first starts processing digItUp
on the next item in the list.
If I comment out line 36 (.collect(Collectors.toList())
), the execute()
method takes ~20 seconds to complete. The threads do not operate in parallel; for each item in the list, the digItUp
operation completes, and then the fillItBackIn
operation completes in sequence before the next item in the list is processed.
It's unclear to me why the exclusion of (.collect(Collectors.toList())
) should change this behavior. Can someone explain?
The complete class:
package com.test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class SimpleExample {
private final ExecutorService diggingThreadPool = Executors.newFixedThreadPool(1);
private final ExecutorService fillingThreadPool = Executors.newFixedThreadPool(1);
public SimpleExample() {
}
public static void main(String[] args) {
List<Double> holesToDig = new ArrayList<>();
Random random = new Random();
for (int c = 0; c < 10; c++) {
holesToDig.add(random.nextDouble(1000));
}
new SimpleExample().execute(holesToDig);
}
public void execute(List<Double> holeVolumes) {
long start = System.currentTimeMillis();
holeVolumes.stream()
.map(volume -> {
CompletableFuture<Double> digItUpCF = CompletableFuture.supplyAsync(() -> digItUp(volume), diggingThreadPool);
return digItUpCF.thenApplyAsync(volumeDugUp -> fillItBackIn(volumeDugUp), fillingThreadPool);
})
.collect(Collectors.toList())
.forEach(cf -> {
Double volume = cf.join();
System.out.println("Dug a hole and filled it back in. Net volume: " + volume);
});
System.out.println("Dug up and filled back in " + holeVolumes.size() + " holes in " + (System.currentTimeMillis() - start) + " ms");
}
public Double digItUp(Double volume) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("Dug hole with volume " + volume);
return volume;
}
public Double fillItBackIn(Double volumeDugUp) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("Filled back in hole of volume " + volumeDugUp);
return 0.0;
}
}
Upvotes: 2
Views: 927
Reputation: 323
Love this question and M A's answer is awesome! I had a similar use case, and I was using Rxjava there. It worked very well, but my colleagues challenged me to implement it without that. T.T
I tested your example and found a workaround to make it the same performance without collect
. The trick is to let the cf.join()
be executed in another thread.
.forEach(cf -> CompletableFuture.supplyAsync(cf::join, anotherThreadpool)
// another threadpool for the join, or you can omit it, using the default forkjoinpool.commonpool
.thenAccept(v -> System.out.println("Dug a hole and filled it back in. Net volume: " + v))
);
But I have to say, this might lead to potential issues as it lacks the support for backpressure...if the upstream is infinite and fast, but the consumer is too slow, all the fast-created CompletableFuture
in the map
operator would be accumulated and submitted to the first diggingThreadPool, finally causing RejectedExecutionException, OOM, etc.
Upvotes: 1
Reputation: 72884
The reason is that collect(Collectors.toList())
is a terminal operation, hence it triggers the stream pipeline (remember that streams are evaluated lazily). So when you call collect
, all of the CompletableFuture
instances are constructed and placed in the list. This means that there is a chain of CompletableFuture
, where each one is in turn a chain composed of two stages, let's call them X and Y.
Every time the first thread executor finishes an X stage, it is free to process the X stage of the next composed CompletableFuture
, while the other thread executor is processing stage Y of the previous CompletableFuture
. This is the result that we intuitively expect.
On the other hand, when you don't call collect
, then forEach
is in this case the terminal operation. However, in this case every element in the stream is processed sequentially (to confirm try switching to parallelStream()
), hence stages X and Y get executed for the first CompletableFuture
. Only when stage Y from the previous stream element is finished, will forEach
move to the second element in the stream pipeline, and only then will a new CompletableFuture
be mapped from the original Double
value.
Upvotes: 4