Reputation: 1451
I'm studying CompletableFuture
in Java 1.8 and having trouble trying to understand allOf
. It seems the main thread doesn't wait for any CompletableFuture
to complete.
See https://github.com/nurkiewicz/reactive/blob/master/src/test/java/be/more/reactive/S03_AllOf.java for the example I'm testing.
The test job finishes before any result is printed.
There are two (ugly?) ways to circumvent this: 1) set a timeout on the main thread and wait for both to finish. 2) set a .get()
at the end and it will become a blocking task.
Why is this?
Code fragment:
package be.more.reactive;
import be.more.reactive.util.BaseTest;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
public class S03_AllOf extends BaseTest {
private static final Logger log = LoggerFactory.getLogger(S03_AllOf.class);
private final CompletableFuture<String> futureResult1 = getFutureQueryResult("1"); //.exceptionally() ??
private final CompletableFuture<String> futureResult2 = getFutureQueryResult("2");
private final CompletableFuture<String> futureResult3 = getFutureQueryResult("3");
private final CompletableFuture<String> futureResult4 = getFutureQueryResult("4");
@Test
public void allOf() throws Exception {
final CompletableFuture<Void> futureResult = CompletableFuture.allOf( //Void ?? I want List<String>
futureResult1, futureResult2, futureResult3, futureResult4
);
// futureResult.thenAccept((Void vd) -> vd.??) //no, it won't work
futureResult.thenRun(() -> {
try {
log.debug("Query result 1: '{}'", futureResult1.get());
log.debug("Query result 2: '{}'", futureResult2.get());
log.debug("Query result 3: '{}'", futureResult3.get());
log.debug("Query result 4: '{}'", futureResult4.get()); //a lot of manual work
log.debug("Now do on complete"); //handling onComplete
} catch (Exception e) {
log.error("", e);
}
});
}
}
And in BaseTest:
protected CompletableFuture<String> getFutureQueryResult(final String queryId) {
return CompletableFuture.supplyAsync(
() -> db.apply(new Query(queryId))
);
}
And in DB.java
package be.more.reactive.db;
import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.RandomUtils.nextLong;
public class DB {
public String apply(Query query) {
try {
TimeUnit.SECONDS.sleep(nextLong(2, 4));
} catch (InterruptedException e) {
e.printStackTrace();
}
return String.format("%s_%s", randomAlphabetic(nextInt(4, 12)), query.getId());
}
}
Upvotes: 1
Views: 9352
Reputation: 3493
The behavior you are seeing is not unexpected. When you create a CompletableFuture
, you are basically scheduling a piece of work that would run asynchronously.
CompletableFuture
before using allOf
Let's say we create a CompletableFuture
like so
var myFuture = CompletableFuture.supplyAsync(() -> myLongRunningOperation());
A CompletableFuture
would call myLongRunningOperation
on a separate thread.
CompletableFuture
runs the tasks using an ExecutorService
, that can be provided during the creation of the CompletableFuture
.
If no ExecutorService
is provided, it uses the one provided by ForkJoinPool#commonPool, which provides a thread pool of Daemon Threads.
The task () -> myLongRunningOperation()
would be submitted to the ExecutorService
irrespective of whether someone is waiting for the result of myFuture
, i.e. irrespective of whether myFuture.join()
or myFuture.get()
are called.
In your test method, this is what is happening behind the scenes
@Test
public void allOf() throws Exception {
// Schedules a computation (futureResult) on a different thread whose only
// work is to wait for the futures futureResult1, futureResult2, futureResult3
// and futureResult4 to complete
final CompletableFuture<Void> futureResult = CompletableFuture.allOf(
futureResult1, futureResult2, futureResult3, futureResult4
);
// Schedules a computation that prints the results AFTER the futureResult is complete.
futureResult.thenRun(() -> {
try {
log.debug("Query result 1: '{}'", futureResult1.get());
log.debug("Query result 2: '{}'", futureResult2.get());
log.debug("Query result 3: '{}'", futureResult3.get());
log.debug("Query result 4: '{}'", futureResult4.get());
log.debug("Now do on complete");
} catch (Exception e) {
log.error("", e);
}
});
// Nothing more to do, so exit
}
but, when you call .join()
or .get()
the thread executing the Test (main thread) will wait for it the scheduled computation to complete before the it exits.
So, if you want your test to wait for the scheduled computation to complete before it exists,
// Schedules a computation that prints the results ONCE the futureResult is complete.
final CompletableFuture<Void> myFuture = futureResult.thenRun(() -> {
try {
log.debug("Query result 1: '{}'", futureResult1.get());
log.debug("Query result 2: '{}'", futureResult2.get());
log.debug("Query result 3: '{}'", futureResult3.get());
log.debug("Query result 4: '{}'", futureResult4.get()); //a lot of manual work
log.debug("Now do on complete"); //handling onComplete
} catch (Exception e) {
log.error("", e);
}
});
// Wait for the myFuture to complete (sucessfully or throw an exception) before continuing.
myFuture.get();
Setting a timeout on the main thread to wait for the Future to complete is an anti-pattern.
If you care about the result and need to wait for the future to
complete, call join()
or get()
based on how you want to do
exception handling.
If you do not care about the result, but would like the application
to wait for the future to complete, then create a custom executor
service that creates non-daemon
threads.
Upvotes: 2
Reputation: 40276
From the Javadoc
Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.
A Future
is an async task which doesn't block until you invoke get
(only blocks if the task is still running).
In this case, the CompleteableFuture
is a compound Future
of all the CompletableFuture
s. This future is still going to be a blocking async call and you must invoke get
or join
to wait for all futures to complete. Again, from the javadoc
CompletableFutures before continuing a program, as in:
CompletableFuture.allOf(c1, c2, c3).join();
.
Your (2) solution, in my opinion, is neither ugly nor unexpected functionality.
Upvotes: 4