Reputation: 1621
I have the following situation where I'm trying to see if there is a solution for:
A happy path should be straightforward, however, when it comes to errors emitted by the services the following rule should adhere to:
The API fails only when both service calls fail -- this should be thrown from the main thread and not the @Async
pool since they are independent threads and don't have access to each other's exception (at least that's my reasoning).
If only one of them fails, log the error through another service (asynchronously), and the API returns only the results from a service that was successful -- this can be done from the respective @Async
threads.
@Service
public class Serv1 interface ServInf {
@Async("customPool")
public CompletableFuture<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return CompletableFuture.completedFuture(/* calling an external RESTful API */);
}
}
@Service
public class Serv2 interface ServInf {
@Async("customPool")
public CompletableFuture<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return CompletableFuture.completedFuture(/* calling another external RESTful API */);
}
}
@RestController
public class MyController {
/** Typical service @Autowired's */
@GetMapping(/* ... */)
public WrapperObj getById(String id) {
CompletableFuture<List<String>> service1Result =
service1.getSomething(id)
.thenApply(result -> {
if (result == null) { return null; }
return result.stream().map(Obj::getName).collect(Collectors.toList());
})
.handle((result, exception) -> {
if (exception != null) {
// Call another asynchronous logging service which should be easy
return null;
} else {
return result;
}
});
CompletableFuture<List<String>> service2Result =
service2.getSomething(id)
.thenApply(result -> {
if (result == null) { return null; }
return result.stream().map(Obj::getName).collect(Collectors.toList());
})
.handle((result, exception) -> {
if (exception != null) {
// Call another asynchronous logging service which should be easy
return null;
} else {
return result;
}
});
// Blocking till we get the results from both services
List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();
/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
/** Signal that the API needs to fail as a whole */
throw new CustomException( /** where to get the messages? */);
}
/** merge and return the result */
}
}
My question is, Since these services return a list of some object, even if I use CompletableFuture.handle()
and check for existence of an exception, I can't return the Exception itself in order to capture and let Spring Advice class handle it (chained to return a list).
One thing I thought of is to use AtomicReference
in order to capture the exceptions and set them within the handle()
and use them once the futures are done/complete, e.g.
AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();
.handle((result, exception) -> {
if (exception != null) {
ce1.set(exception);
return null; // This signals that there was a failure
} else {
return result;
}
});
List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();
/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
/** Signal that the API needs to fail as a whole */
throw new CustomException(/** do logic to capture ce1.get().getMessage() + ce2.get().getMessage() */);
}
First, does this sound like a viable solution in this multi-threaded asynchronous calls?
Second, this looks messy, so I was wondering if there is a more elegant way of capturing these exceptions outside of Spring async pool, and deal with it in the main thread, e.g. combine the exception information and throw it to Spring Advice exception handler.
Upvotes: 2
Views: 2980
Reputation: 1733
Just because I raised it as a possibility, I imagine something like this should work in a world using Project Reactor:
First we modify the services to return Mono
s, which is easy using Mono.fromFuture
(or you can turn one service into Reactor style, if and once it is ready):
@Service
public class Serv1 implements ServInf {
public Mono<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return Mono.fromFuture(CompletableFuture.completedFuture(/* calling an external RESTful API */));
//This Mono will either emit the result or complete with an error in case of Exception
}
}
//similar for Serv2
The (reactive) endpoint could look like this (please see numbered comments below):
public Mono<WrapperObj> getById(String id) {
WrapperObj wrapper = new WrapperObj(); //1
Mono<Optional<List<Obj>>> s1Mono = serv1.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS1ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
Mono<Optional<List<Obj>>> s2Mono = serv2.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS2ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
return s1Mono
.zipWith(s2Mono) //6
.map(result ->
//transforms non-error results and merges them into the wrapper object
transformResult(result.getT1().orElse(null), result.getT2().orElse(null), wrapper) //7
)
.switchIfEmpty(Mono.just(wrapper)) //8
;
}
Comments:
The result is used to 'accumulate' the results and Exceptions
Call services on the boundedElastic
thread pool, which is recommended for longer IO tasks.
Wrap the result in Optional
. I am using an empty Optional as a convenience result for erroneous completion, since null
s don't propagate nicely through Reactor.
If the service call throws an exception, we can set the corresponding erroneous result on the WrapperObj
. This is similar to your use of AtomicReference
, but without creating additional objects.
However, such an exception would cause zipWith
(6) to fail, so if this happens we substitute the Optional.empty()
result.
zipWith
creates a tuple of both results
We process these results, replacing
What's left is to transform the two (non-exceptional) results:
private WrapperObj transformResult(List<Obj> s1Result, List<Obj> s2Result, WrapperObj wrapper) {
//perform your result transformation and
//flesh out 'wrapper' with the results
//if there was an exception, the 'wrapper' contains the corresponding exception values
return wrapper;
}
Upvotes: 1
Reputation: 298103
Assuming two futures
CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …
a straight-forward approach to combine the two futures is
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
but this future will fail if either future fails.
To fail only when both futures failed and construct a new exception from both throwable, we can define two utility methods:
private static Throwable getThrowable(CompletableFuture<?> f) {
return f.<Throwable>thenApply(value -> null)
.exceptionally(throwable -> throwable).join();
}
private static <T> T throwCustom(Throwable t1, Throwable t2) {
throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}
The method getThrowable
is intended to be used with a future already known to be completed exceptionally. We could call join
and catch the exception, but as show above, we can also turn transform the future to a non-exceptional future containing the throwable as its value.
Then, we can combine all of the above to
CompletableFuture<List<String>> failOnlyWhenBothFailed = both
.thenApply(list -> both)
.exceptionally(t ->
!service1Result.isCompletedExceptionally()? service1Result:
!service2Result.isCompletedExceptionally()? service2Result:
throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
.thenCompose(Function.identity());
Within the function passed to exceptionally
, the incoming futures are already known to be completed, so we can use the utility methods to extract the throwables and throw a new exception.
The advantage of this is that the resulting construction is non-blocking.
But in your case, you want to wait for the completion rather than returning a future, so we can simplify the operation:
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
both.exceptionally(t -> null).join();
if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}
List<String> result = (
service1Result.isCompletedExceptionally()? service2Result:
service2Result.isCompletedExceptionally()? service1Result: both
).join();
By using both.exceptionally(t -> null).join();
we wait for the completion of both jobs, without throwing an exception on failures. After this statement, we can safely use isCompletedExceptionally()
to check the futures we know to be completed.
So if both failed, we extract the throwables and throw our custom exception, otherwise, we check which task(s) succeeded and extract the result of either or both.
Upvotes: 4
Reputation: 48807
CompletableFutures are quite cumbersome to deal with, but here would be a more functional and reactive approach IMO.
We'll need that sequence
method from https://stackoverflow.com/a/30026710/1225328:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
Then, I'm using Optional
to represent the status of the operations, but a Try
monad would suit better (so use one if you have such a utility in your codebase - Java doesn't bring one natively yet):
CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
Now wait for the two futures and handle the results once available:
CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
Optional<List<Object>> result1 = results.get(0);
Optional<List<Object>> result2 = results.get(1);
if (result1.isEmpty() && result2.isEmpty()) {
throw new CustomException(...);
}
// https://stackoverflow.com/a/18687790/1225328:
return Stream.of(
result1.map(Stream::of).orElseGet(Stream::empty),
result2.map(Stream::of).orElseGet(Stream::empty)
).collect(Collectors.toList());
});
Then you would ideally return mergedResults
directly and let the framework deal with it for you so that you don't block any thread, or you can .get()
on it (which will block the thread), which will throw an ExecutionException
if your CustomException
(or any other exception) is thrown (accessible in e.getCause()
).
This would look simpler with Project Reactor (or equivalent), in case you're using it already, but the idea would be roughly the same.
Upvotes: 2