codesmith
codesmith

Reputation: 1451

Why doesn't my thread wait for CompletableFutures to complete with `allOf()`?

I'm studying CompletableFuture in Java 1.8 and having trouble trying to understand allOf. It seems the main thread doesn't wait for any CompletableFuture to complete.

See https://github.com/nurkiewicz/reactive/blob/master/src/test/java/be/more/reactive/S03_AllOf.java for the example I'm testing.

The test job finishes before any result is printed.

There are two (ugly?) ways to circumvent this: 1) set a timeout on the main thread and wait for both to finish. 2) set a .get() at the end and it will become a blocking task.

Why is this?

Code fragment:

package be.more.reactive;

import be.more.reactive.util.BaseTest;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

public class S03_AllOf extends BaseTest {

    private static final Logger log = LoggerFactory.getLogger(S03_AllOf.class);

    private final CompletableFuture<String> futureResult1 = getFutureQueryResult("1"); //.exceptionally() ??
    private final CompletableFuture<String> futureResult2 = getFutureQueryResult("2");
    private final CompletableFuture<String> futureResult3 = getFutureQueryResult("3");
    private final CompletableFuture<String> futureResult4 = getFutureQueryResult("4");

    @Test
    public void allOf() throws Exception {
        final CompletableFuture<Void> futureResult = CompletableFuture.allOf(   //Void ?? I want List<String>
                futureResult1, futureResult2, futureResult3, futureResult4
        );

//        futureResult.thenAccept((Void vd) -> vd.??)   //no, it won't work

        futureResult.thenRun(() -> {
            try {
                log.debug("Query result 1: '{}'", futureResult1.get());
                log.debug("Query result 2: '{}'", futureResult2.get());
                log.debug("Query result 3: '{}'", futureResult3.get());
                log.debug("Query result 4: '{}'", futureResult4.get());   //a lot of manual work

                log.debug("Now do on complete");    //handling onComplete
            } catch (Exception e) {
                log.error("", e);
            }
        });

    }

}

And in BaseTest:

protected CompletableFuture<String> getFutureQueryResult(final String queryId) {
    return CompletableFuture.supplyAsync(
            () -> db.apply(new Query(queryId))

    );
}

And in DB.java

package be.more.reactive.db;

import java.util.concurrent.TimeUnit;

import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.RandomUtils.nextLong;

public class DB {
    public String apply(Query query) {
        try {
            TimeUnit.SECONDS.sleep(nextLong(2, 4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.format("%s_%s", randomAlphabetic(nextInt(4, 12)), query.getId());
    }
}

Upvotes: 1

Views: 9352

Answers (2)

Aditya Vikas Devarapalli
Aditya Vikas Devarapalli

Reputation: 3493

The behavior you are seeing is not unexpected. When you create a CompletableFuture, you are basically scheduling a piece of work that would run asynchronously.

Things to understand about CompletableFuture before using allOf

Let's say we create a CompletableFuture like so

var myFuture = CompletableFuture.supplyAsync(() -> myLongRunningOperation());
  • A CompletableFuture would call myLongRunningOperation on a separate thread.

  • CompletableFuture runs the tasks using an ExecutorService, that can be provided during the creation of the CompletableFuture.

  • If no ExecutorService is provided, it uses the one provided by ForkJoinPool#commonPool, which provides a thread pool of Daemon Threads.

  • The task () -> myLongRunningOperation() would be submitted to the ExecutorService irrespective of whether someone is waiting for the result of myFuture, i.e. irrespective of whether myFuture.join() or myFuture.get() are called.

In your test method, this is what is happening behind the scenes

@Test
public void allOf() throws Exception {
    // Schedules a computation (futureResult) on a different thread whose only 
    // work is to wait for the futures futureResult1, futureResult2, futureResult3 
    // and futureResult4 to complete
    final CompletableFuture<Void> futureResult = CompletableFuture.allOf(
            futureResult1, futureResult2, futureResult3, futureResult4
    );

    //  Schedules a computation that prints the results AFTER the futureResult is complete.
    futureResult.thenRun(() -> {
        try {
            log.debug("Query result 1: '{}'", futureResult1.get());
            log.debug("Query result 2: '{}'", futureResult2.get());
            log.debug("Query result 3: '{}'", futureResult3.get());
            log.debug("Query result 4: '{}'", futureResult4.get());
            log.debug("Now do on complete");
        } catch (Exception e) {
            log.error("", e);
        }
    });

    // Nothing more to do, so exit

}

but, when you call .join() or .get() the thread executing the Test (main thread) will wait for it the scheduled computation to complete before the it exits.

So, if you want your test to wait for the scheduled computation to complete before it exists,

//  Schedules a computation that prints the results ONCE the futureResult is complete.
final CompletableFuture<Void> myFuture = futureResult.thenRun(() -> {
    try {
        log.debug("Query result 1: '{}'", futureResult1.get());
        log.debug("Query result 2: '{}'", futureResult2.get());
        log.debug("Query result 3: '{}'", futureResult3.get());
        log.debug("Query result 4: '{}'", futureResult4.get());   //a lot of manual work

        log.debug("Now do on complete");    //handling onComplete
    } catch (Exception e) {
        log.error("", e);
    }
});

// Wait for the myFuture to complete (sucessfully or throw an exception) before continuing.
myFuture.get();

Setting a timeout on the main thread to wait for the Future to complete is an anti-pattern.

  • If you care about the result and need to wait for the future to complete, call join() or get() based on how you want to do exception handling.

  • If you do not care about the result, but would like the application to wait for the future to complete, then create a custom executor service that creates non-daemon threads.

Upvotes: 2

John Vint
John Vint

Reputation: 40276

From the Javadoc

Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.

A Future is an async task which doesn't block until you invoke get (only blocks if the task is still running).

In this case, the CompleteableFuture is a compound Future of all the CompletableFutures. This future is still going to be a blocking async call and you must invoke get or join to wait for all futures to complete. Again, from the javadoc

CompletableFutures before continuing a program, as in: CompletableFuture.allOf(c1, c2, c3).join();.

Your (2) solution, in my opinion, is neither ugly nor unexpected functionality.

Upvotes: 4

Related Questions