user1007522
user1007522

Reputation: 8118

Throw Exception in Function apply for CompletableFutures

I'm having some tasks created like follows (this is just for demonstration normally network calls):

public class RandomTask implements Function<String, String> {
    private int number;
    private int waitTime;
    private boolean throwError;

    public RandomTask(int number, int waitTime, boolean throwError) {
        this.number = number;
        this.waitTime = waitTime;
        this.throwError = throwError;
    }

    @Override
    public String apply(String s) {
        System.out.println("Job " + number + " started");
        try {
            Thread.sleep(waitTime);

            if (throwError) {
                throw new InterruptedException("Something happened");
            }

        } catch (InterruptedException e) {
            System.out.println("Error " + e.getLocalizedMessage());
        }

        return "RandomTask " + number + " finished";
    }
}

Then I have a Chain class where I chain some tasks together per job.

static CompletableFuture<String> start(ExecutorService executorService) {
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Foo", executorService)
            .thenApplyAsync(new RandomTask(3, 100, false), executorService)
            .thenApplyAsync(new RandomTask(4, 100, false), executorService);

    return future2;
}

I then start 2 chains as follows:

  CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(Chain1.start(fixedThreadPool), Chain2.start(fixedThreadPool));
    try {
        combinedFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

That way the two chains start off at the same time.

Now I want to throw an exception in a task and catch it where I call combinedFuture.get() so that I know which task has failed in my chain.

The thing is dat I can't adapt the Function because CompletableFutures complains about this. I tried with:

@FunctionalInterface
public interface CheckedFunction<T, R> {
    R apply(T t) throws InterruptedException;
}

But this doesn't work. Is this not possible or how can I achieve my goal?

Upvotes: 0

Views: 120

Answers (1)

Holger
Holger

Reputation: 298103

“That way the two chains start off at the same time.” indicates that you have a fundamentally wrong understanding of how CompletableFuture works.

Asynchronous operations are submitted to the executor service right when you create them or as soon as their prerequisites are available. So in case of supplyAsync, which has no dependencies, the asynchronous operation starts right within the supplyAsync invocation.

All, a construct like CompletableFuture.allOf(job1, job2).get() does, is to create a new stage depending on both jobs and waiting for its completion, so the net result is just to wait for the completion of both jobs. It does not start the jobs. They are already running. Waiting for a completion has no influence of the process of completing.

Chaining a CompletableFuture with a custom function type allowing checked exceptions can be done as

public static <T,R> CompletableFuture<R> thenApplyAsync(
    CompletableFuture<T> f, CheckedFunction<? super T, ? extends R> cf,
    Executor e) {

    CompletableFuture<R> r = new CompletableFuture<>();
    f.whenCompleteAsync((v,t) -> {
        try {
            if(t != null) r.completeExceptionally(t);
            else r.complete(cf.apply(v));
        } catch(Throwable t2) {
            r.completeExceptionally(t2);
        }
    }, e);
    return r;
}

To use this method, instead of chaining calls on the CompletableFuture, you have to nest them. E.g.

static CompletableFuture<String> start(ExecutorService executorService) {
    CompletableFuture<String> future2 = 
        thenApplyAsync(thenApplyAsync(
            CompletableFuture.supplyAsync(() -> "Foo", executorService),
            new RandomTask(3, 100, false), executorService),
            new RandomTask(4, 100, false), executorService);

    return future2;
}

given

public class RandomTask implements CheckedFunction<String, String> {
    private int number, waitTime;
    private boolean throwError;

    public RandomTask(int number, int waitTime, boolean throwError) {
        this.number = number;
        this.waitTime = waitTime;
        this.throwError = throwError;
    }

    @Override
    public String apply(String s) throws InterruptedException {
        System.out.println("Job " + number + " started");
        Thread.sleep(waitTime);
        if (throwError) {
            throw new InterruptedException("Something happened in "+number);
        }
        return "RandomTask " + number + " finished";
    }
}

You can still create two tasks and wait for both like

CompletableFuture.allOf(Chain1.start(fixedThreadPool), Chain2.start(fixedThreadPool))
    .join();

Upvotes: 1

Related Questions