Reputation: 8118
I'm having some tasks created like follows (this is just for demonstration normally network calls):
public class RandomTask implements Function<String, String> {
private int number;
private int waitTime;
private boolean throwError;
public RandomTask(int number, int waitTime, boolean throwError) {
this.number = number;
this.waitTime = waitTime;
this.throwError = throwError;
}
@Override
public String apply(String s) {
System.out.println("Job " + number + " started");
try {
Thread.sleep(waitTime);
if (throwError) {
throw new InterruptedException("Something happened");
}
} catch (InterruptedException e) {
System.out.println("Error " + e.getLocalizedMessage());
}
return "RandomTask " + number + " finished";
}
}
Then I have a Chain class where I chain some tasks together per job.
static CompletableFuture<String> start(ExecutorService executorService) {
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Foo", executorService)
.thenApplyAsync(new RandomTask(3, 100, false), executorService)
.thenApplyAsync(new RandomTask(4, 100, false), executorService);
return future2;
}
I then start 2 chains as follows:
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(Chain1.start(fixedThreadPool), Chain2.start(fixedThreadPool));
try {
combinedFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
That way the two chains start off at the same time.
Now I want to throw an exception in a task and catch it where I call combinedFuture.get() so that I know which task has failed in my chain.
The thing is dat I can't adapt the Function because CompletableFutures complains about this. I tried with:
@FunctionalInterface
public interface CheckedFunction<T, R> {
R apply(T t) throws InterruptedException;
}
But this doesn't work. Is this not possible or how can I achieve my goal?
Upvotes: 0
Views: 120
Reputation: 298103
“That way the two chains start off at the same time.” indicates that you have a fundamentally wrong understanding of how CompletableFuture
works.
Asynchronous operations are submitted to the executor service right when you create them or as soon as their prerequisites are available. So in case of supplyAsync
, which has no dependencies, the asynchronous operation starts right within the supplyAsync
invocation.
All, a construct like CompletableFuture.allOf(job1, job2).get()
does, is to create a new stage depending on both jobs and waiting for its completion, so the net result is just to wait for the completion of both jobs. It does not start the jobs. They are already running. Waiting for a completion has no influence of the process of completing.
Chaining a CompletableFuture
with a custom function type allowing checked exceptions can be done as
public static <T,R> CompletableFuture<R> thenApplyAsync(
CompletableFuture<T> f, CheckedFunction<? super T, ? extends R> cf,
Executor e) {
CompletableFuture<R> r = new CompletableFuture<>();
f.whenCompleteAsync((v,t) -> {
try {
if(t != null) r.completeExceptionally(t);
else r.complete(cf.apply(v));
} catch(Throwable t2) {
r.completeExceptionally(t2);
}
}, e);
return r;
}
To use this method, instead of chaining calls on the CompletableFuture
, you have to nest them. E.g.
static CompletableFuture<String> start(ExecutorService executorService) {
CompletableFuture<String> future2 =
thenApplyAsync(thenApplyAsync(
CompletableFuture.supplyAsync(() -> "Foo", executorService),
new RandomTask(3, 100, false), executorService),
new RandomTask(4, 100, false), executorService);
return future2;
}
given
public class RandomTask implements CheckedFunction<String, String> {
private int number, waitTime;
private boolean throwError;
public RandomTask(int number, int waitTime, boolean throwError) {
this.number = number;
this.waitTime = waitTime;
this.throwError = throwError;
}
@Override
public String apply(String s) throws InterruptedException {
System.out.println("Job " + number + " started");
Thread.sleep(waitTime);
if (throwError) {
throw new InterruptedException("Something happened in "+number);
}
return "RandomTask " + number + " finished";
}
}
You can still create two tasks and wait for both like
CompletableFuture.allOf(Chain1.start(fixedThreadPool), Chain2.start(fixedThreadPool))
.join();
Upvotes: 1