Malvon
Malvon

Reputation: 1621

How to Obtain the Exception Outside of CompletableFuture Handler?

I have the following situation where I'm trying to see if there is a solution for:

A happy path should be straightforward, however, when it comes to errors emitted by the services the following rule should adhere to:

My question is, Since these services return a list of some object, even if I use CompletableFuture.handle() and check for existence of an exception, I can't return the Exception itself in order to capture and let Spring Advice class handle it (chained to return a list).

One thing I thought of is to use AtomicReference in order to capture the exceptions and set them within the handle() and use them once the futures are done/complete, e.g.

AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();

.handle((result, exception) -> {
    if (exception != null) {
        ce1.set(exception);
        return null; // This signals that there was a failure
    } else {
        return result;
    }
});

List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();

/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
    /** Signal that the API needs to fail as a whole */
    throw new CustomException(/** do logic to capture ce1.get().getMessage() + ce2.get().getMessage() */);
}

First, does this sound like a viable solution in this multi-threaded asynchronous calls?

Second, this looks messy, so I was wondering if there is a more elegant way of capturing these exceptions outside of Spring async pool, and deal with it in the main thread, e.g. combine the exception information and throw it to Spring Advice exception handler.

Upvotes: 2

Views: 2980

Answers (3)

Thomas Timbul
Thomas Timbul

Reputation: 1733

Just because I raised it as a possibility, I imagine something like this should work in a world using Project Reactor:

First we modify the services to return Monos, which is easy using Mono.fromFuture (or you can turn one service into Reactor style, if and once it is ready):

@Service
public class Serv1 implements ServInf {
    public Mono<List<Obj>> getSomething(int id) {
        // The service ensures that the list is never null, but it can be empty
        return Mono.fromFuture(CompletableFuture.completedFuture(/* calling an external RESTful API */));
        //This Mono will either emit the result or complete with an error in case of Exception
    }
}

//similar for Serv2

The (reactive) endpoint could look like this (please see numbered comments below):

public Mono<WrapperObj> getById(String id) {
        WrapperObj wrapper = new WrapperObj(); //1
        Mono<Optional<List<Obj>>> s1Mono = serv1.getSomething(id)
            .subscribeOn(Schedulers.boundedElastic()) //2
            .map(Optional::ofNullable) //3
            .doOnError(wrapper::setS1ErrorResult) //4
            .onErrorResume(t -> Mono.just(Optional.empty())); //5

        Mono<Optional<List<Obj>>> s2Mono = serv2.getSomething(id)
            .subscribeOn(Schedulers.boundedElastic()) //2
            .map(Optional::ofNullable) //3
            .doOnError(wrapper::setS2ErrorResult) //4
            .onErrorResume(t -> Mono.just(Optional.empty())); //5

        return s1Mono
            .zipWith(s2Mono) //6
            .map(result ->
                //transforms non-error results and merges them into the wrapper object
                transformResult(result.getT1().orElse(null), result.getT2().orElse(null), wrapper) //7
            )
            .switchIfEmpty(Mono.just(wrapper)) //8

        ;
    }

Comments:

  1. The result is used to 'accumulate' the results and Exceptions

  2. Call services on the boundedElastic thread pool, which is recommended for longer IO tasks.

  3. Wrap the result in Optional. I am using an empty Optional as a convenience result for erroneous completion, since nulls don't propagate nicely through Reactor.

  4. If the service call throws an exception, we can set the corresponding erroneous result on the WrapperObj. This is similar to your use of AtomicReference, but without creating additional objects.

  5. However, such an exception would cause zipWith (6) to fail, so if this happens we substitute the Optional.empty() result.

  6. zipWith creates a tuple of both results

  7. We process these results, replacing

  8. What's left is to transform the two (non-exceptional) results:

    private WrapperObj transformResult(List<Obj> s1Result, List<Obj> s2Result, WrapperObj wrapper) {
        //perform your result transformation and
        //flesh out 'wrapper' with the results
        //if there was an exception, the 'wrapper' contains the corresponding exception values
        return wrapper;
    }
    

Upvotes: 1

Holger
Holger

Reputation: 298103

Assuming two futures

CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …

a straight-forward approach to combine the two futures is

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

but this future will fail if either future fails.

To fail only when both futures failed and construct a new exception from both throwable, we can define two utility methods:

private static Throwable getThrowable(CompletableFuture<?> f) {
    return f.<Throwable>thenApply(value -> null)
            .exceptionally(throwable -> throwable).join();
}

private static <T> T throwCustom(Throwable t1, Throwable t2) {
    throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}

The method getThrowable is intended to be used with a future already known to be completed exceptionally. We could call join and catch the exception, but as show above, we can also turn transform the future to a non-exceptional future containing the throwable as its value.

Then, we can combine all of the above to

CompletableFuture<List<String>> failOnlyWhenBothFailed = both
    .thenApply(list -> both)
    .exceptionally(t ->
        !service1Result.isCompletedExceptionally()? service1Result:
        !service2Result.isCompletedExceptionally()? service2Result:
        throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
    .thenCompose(Function.identity());

Within the function passed to exceptionally, the incoming futures are already known to be completed, so we can use the utility methods to extract the throwables and throw a new exception.

The advantage of this is that the resulting construction is non-blocking.

But in your case, you want to wait for the completion rather than returning a future, so we can simplify the operation:

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

both.exceptionally(t -> null).join();

if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
  Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
  throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}

List<String> result = (
    service1Result.isCompletedExceptionally()? service2Result:
    service2Result.isCompletedExceptionally()? service1Result: both
).join();

By using both.exceptionally(t -> null).join(); we wait for the completion of both jobs, without throwing an exception on failures. After this statement, we can safely use isCompletedExceptionally() to check the futures we know to be completed.

So if both failed, we extract the throwables and throw our custom exception, otherwise, we check which task(s) succeeded and extract the result of either or both.

Upvotes: 4

sp00m
sp00m

Reputation: 48807

CompletableFutures are quite cumbersome to deal with, but here would be a more functional and reactive approach IMO.

We'll need that sequence method from https://stackoverflow.com/a/30026710/1225328:

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

Then, I'm using Optional to represent the status of the operations, but a Try monad would suit better (so use one if you have such a utility in your codebase - Java doesn't bring one natively yet):

CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});

Now wait for the two futures and handle the results once available:

CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
    Optional<List<Object>> result1 = results.get(0);
    Optional<List<Object>> result2 = results.get(1);
    if (result1.isEmpty() && result2.isEmpty()) {
        throw new CustomException(...);
    }
    // https://stackoverflow.com/a/18687790/1225328:
    return Stream.of(
            result1.map(Stream::of).orElseGet(Stream::empty),
            result2.map(Stream::of).orElseGet(Stream::empty)
    ).collect(Collectors.toList());
});

Then you would ideally return mergedResults directly and let the framework deal with it for you so that you don't block any thread, or you can .get() on it (which will block the thread), which will throw an ExecutionException if your CustomException (or any other exception) is thrown (accessible in e.getCause()).


This would look simpler with Project Reactor (or equivalent), in case you're using it already, but the idea would be roughly the same.

Upvotes: 2

Related Questions