igr
igr

Reputation: 10604

CompletableFuture single task that continues with many parallel tasks

I have the following code:

return CompletableFuture.supplyAsync(() -> {
    return foo; // some custom object
})
.thenAccept(foo -> {
     // ??? need to spawn N async parallel jobs that works on 'foo'
});

In english: the first task creates the foo object asynchronously; and then I need to run N parallel processes on it.

Is there a better way to do this then:

...
CompletableFuture[] parallel = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
    parallel[i] = CompletableFuture.runAsync(() -> {
        work(foo);
    });
}
CompletableFuture.allOf(parallel).join();
...

I don't like this as one thread gets locked while waiting N jobs to finish.

Upvotes: 3

Views: 6815

Answers (2)

Holger
Holger

Reputation: 298153

You can chain as many independent jobs as you like on a particular prerequisite job, e.g.

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo)));

will spawn N parallel jobs, invoking work(foo) concurrently, after the completion of the initial job which provides the Foo instance.

But keep in mind, that the underlying framework will consider the number of available CPU cores to size the thread pool actually executing the parallel jobs, so if N > #cores, some of these jobs may run one after another.

If the work is I/O bound, thus, you want to have a higher number of parallel threads, you have to specify your own executor.


The nCopies/forEach is not necessary, a for loop would do as well, but it provides a hint of how to handle subsequent jobs, that depend on the completion of all these parallel jobs:

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
CompletableFuture<Void> all = CompletableFuture.allOf(
    Collections.nCopies(N, base).stream()
        .map(f -> f.thenAcceptAsync(foo -> work(foo)))
        .toArray(CompletableFuture<?>[]::new));

Now you can use all to check for the completion of all jobs or chain additional actions.

Upvotes: 2

the8472
the8472

Reputation: 43052

Since CompletableFuture.allOf already returns another CompletableFuture<Void>a you can just do another .thenAccept on it and extract the returned values from the CFs in parallel in the callback, that way you avoid calling join

Upvotes: 0

Related Questions