Reputation: 10604
I have the following code:
return CompletableFuture.supplyAsync(() -> {
return foo; // some custom object
})
.thenAccept(foo -> {
// ??? need to spawn N async parallel jobs that works on 'foo'
});
In english: the first task creates the foo
object asynchronously; and then I need to run N parallel processes on it.
Is there a better way to do this then:
...
CompletableFuture[] parallel = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
parallel[i] = CompletableFuture.runAsync(() -> {
work(foo);
});
}
CompletableFuture.allOf(parallel).join();
...
I don't like this as one thread gets locked while waiting N jobs to finish.
Upvotes: 3
Views: 6815
Reputation: 298153
You can chain as many independent jobs as you like on a particular prerequisite job, e.g.
CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo)));
will spawn N
parallel jobs, invoking work(foo)
concurrently, after the completion of the initial job which provides the Foo
instance.
But keep in mind, that the underlying framework will consider the number of available CPU cores to size the thread pool actually executing the parallel jobs, so if N > #cores
, some of these jobs may run one after another.
If the work is I/O bound, thus, you want to have a higher number of parallel threads, you have to specify your own executor.
The nCopies
/forEach
is not necessary, a for
loop would do as well, but it provides a hint of how to handle subsequent jobs, that depend on the completion of all these parallel jobs:
CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
CompletableFuture<Void> all = CompletableFuture.allOf(
Collections.nCopies(N, base).stream()
.map(f -> f.thenAcceptAsync(foo -> work(foo)))
.toArray(CompletableFuture<?>[]::new));
Now you can use all
to check for the completion of all jobs or chain additional actions.
Upvotes: 2
Reputation: 43052
Since CompletableFuture.allOf
already returns another CompletableFuture<Void>
a you can just do another .thenAccept
on it and extract the returned values from the CFs in parallel
in the callback, that way you avoid calling join
Upvotes: 0