xmas79
xmas79

Reputation: 5180

Retry logic with CompletableFuture

I need to submit a task in an async framework I'm working on, but I need to catch for exceptions, and retry the same task multiple times before "aborting".

The code I'm working with is:

int retries = 0;
public CompletableFuture<Result> executeActionAsync() {

    // Execute the action async and get the future
    CompletableFuture<Result> f = executeMycustomActionHere();

    // If the future completes with exception:
    f.exceptionally(ex -> {
        retries++; // Increment the retry count
        if (retries < MAX_RETRIES)
            return executeActionAsync();  // <--- Submit one more time

        // Abort with a null value
        return null;
    });

    // Return the future    
    return f;
}

This currently doesn't compile because the return type of the lambda is wrong: it expects a Result, but the executeActionAsync returns a CompletableFuture<Result>.

How can I implement this fully async retry logic?

Upvotes: 37

Views: 45467

Answers (10)

Linden X. Quan
Linden X. Quan

Reputation: 792

Inspired by theazureshadow's answer. His or her answer was great but doesn't work with new version of FailSafe. The below code works with

<dependency>
  <groupId>dev.failsafe</groupId>
  <artifactId>failsafe</artifactId>
  <version>3.3.0</version>
</dependency>

solution:

    RetryPolicy<Object> retryPolicy = RetryPolicy.builder()
        .withMaxRetries(MAX_RETRY)
        .withBackoff(INITIAL_DELAY, MAX_DELAY, ChronoUnit.SECONDS)
        .build();
    Fallback<Object> fallback = Fallback.of((AuditEvent) null);

    public CompletableFuture<Object> executeAsync(Runnable asyncTask) {
    return Failsafe.with(fallback)
            .compose(retryPolicy)
            .with(executorService)
            .onFailure(e -> LOG.error(e.getException().getMessage()))
            .getAsync(() -> asyncTask());
}

Upvotes: 1

YKun and coding
YKun and coding

Reputation: 323

I would suggest using resilience4j for this use case. It's very handy!!

Introduction: resilience4j-retry and its Javadoc: Retry

They have method to decorate completionStage directly as below:

default <T> java.util.concurrent.CompletionStage<T> executeCompletionStage​(java.util.concurrent.ScheduledExecutorService scheduler,
java.util.function.Supplier<java.util.concurrent.CompletionStage<T>> supplier)

Upvotes: 0

Yan
Yan

Reputation: 328

We needed to retry a task based on an error condition.

public static <T> CompletableFuture<T> retryOnCondition(Supplier<CompletableFuture<T>> supplier,
                                             Predicate<Throwable> retryPredicate, int maxAttempts) {
    if (maxAttempts <= 0) {
        throw new IllegalArgumentException("maxAttempts can't be <= 0");
    }
    return retryOnCondition(supplier, retryPredicate, null, maxAttempts);
}

private static <T> CompletableFuture<T> retryOnCondition(
    Supplier<CompletableFuture<T>> supplier, Predicate<Throwable> retryPredicate,
    Throwable lastError, int attemptsLeft) {

    if (attemptsLeft == 0) {
        return CompletableFuture.failedFuture(lastError);
    }

    return supplier.get()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(error -> {
            boolean doRetry = retryPredicate.test(error);
            int attempts = doRetry ? attemptsLeft - 1 : 0;
            return retryOnCondition(supplier, retryPredicate, error, attempts);
        })
        .thenCompose(Function.identity());
}

Usage:

public static void main(String[] args) {
    retryOnCondition(() -> myTask(), e -> {
        //log exception
        return e instanceof MyException;
    }, 3).join();
}

Upvotes: 0

Ali
Ali

Reputation: 1590

maybe it's late but hopes someone might find this useful, I recently solved this problem for retrying rest API call on failure. In my case, I have to retry on 500 HTTP status code, below is my rest client code (we are using WSClient from play framework) you can change it to whatever rest-client as per requirement.

 int MAX_RETRY = 3;
 CompletableFuture<WSResponse> future = new CompletableFuture<>();

 private CompletionStage<WSResponse> getWS(Object request,String url, int retry, CompletableFuture future) throws JsonProcessingException {
 ws.url(url)
        .post(Json.parse(mapper.writeValueAsString(request)))
        .whenCompleteAsync((wsResponse, exception) -> {
            if(wsResponse.getStatus() == 500 && retry < MAX_RETRY) {
                try {
                    getWS(request, retry+1, future);
                } catch (IOException e) {
                    throw new Exception(e);
                }
            }else {
                future.complete(wsResponse);
            }
        });

     return future;
}

This code will return immediately if the status code is 200 or other than 500 whereas if HTTP status is 500 it will retry 3 times.

Upvotes: 3

殷振南
殷振南

Reputation: 71

util class:

public class RetryUtil {

    public static <R> CompletableFuture<R> retry(Supplier<CompletableFuture<R>> supplier, int maxRetries) {
        CompletableFuture<R> f = supplier.get();
        for(int i=0; i<maxRetries; i++) {
            f=f.thenApply(CompletableFuture::completedFuture)
                .exceptionally(t -> {
                    System.out.println("retry for: "+t.getMessage());
                    return supplier.get();
                })
                .thenCompose(Function.identity());
        }
        return f;
    }
}

usage:

public CompletableFuture<String> lucky(){
    return CompletableFuture.supplyAsync(()->{
        double luckNum = Math.random();
        double luckEnough = 0.6;
        if(luckNum < luckEnough){
            throw new RuntimeException("not luck enough: " + luckNum);
        }
        return "I'm lucky: "+luckNum;
    });
}
@Test
public void testRetry(){
    CompletableFuture<String> retry = RetryUtil.retry(this::lucky, 10);
    System.out.println("async check");
    String join = retry.join();
    System.out.println("lucky? "+join);
}

output

async check
retry for: java.lang.RuntimeException: not luck enough: 0.412296354211683
retry for: java.lang.RuntimeException: not luck enough: 0.4099777199676573
lucky? I'm lucky: 0.8059089479049389

Upvotes: 7

theazureshadow
theazureshadow

Reputation: 10059

Instead of implementing your own retry logic, I recommend using a proven library like failsafe, which has built-in support for futures (and seems more popular than guava-retrying). For your example, it would look something like:

private static RetryPolicy retryPolicy = new RetryPolicy()
    .withMaxRetries(MAX_RETRIES);

public CompletableFuture<Result> executeActionAsync() {
    return Failsafe.with(retryPolicy)
        .with(executor)
        .withFallback(null)
        .future(this::executeMycustomActionHere);
}

Probably you should avoid .withFallback(null) and just have let the returned future's .get() method throw the resulting exception so the caller of your method can handle it specifically, but that's a design decision you'll have to make.

Other things to think about include whether you should retry immediately or wait some period of time between attempts, any sort of recursive backoff (useful when you're calling a web service that might be down), and whether there are specific exceptions that aren't worth retrying (e.g. if the parameters to the method are invalid).

Upvotes: 15

Holger
Holger

Reputation: 298233

Chaining subsequent retries can be straight-forward:

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.exceptionally(t -> executeMycustomActionHere().join());
    }
    return f;
}

Read about the drawbacks below
This simply chains as many retries as intended, as these subsequent stages won’t do anything in the non-exceptional case.

One drawback is that if the first attempt fails immediately, so that f is already completed exceptionally when the first exceptionally handler is chained, the action will be invoked by the calling thread, removing the asynchronous nature of the request entirely. And generally, join() may block a thread (the default executor will start a new compensation thread then, but still, it’s discouraged). Unfortunately, there is neither, an exceptionallyAsync or an exceptionallyCompose method.

A solution not invoking join() would be

public CompletableFuture<Result> executeActionAsync() {
    CompletableFuture<Result> f=executeMycustomActionHere();
    for(int i=0; i<MAX_RETRIES; i++) {
        f=f.thenApply(CompletableFuture::completedFuture)
           .exceptionally(t -> executeMycustomActionHere())
           .thenCompose(Function.identity());
    }
    return f;
}

demonstrating how involved combining “compose” and an “exceptionally” handler is.

Further, only the last exception will be reported, if all retries failed. A better solution should report the first exception, with subsequent exceptions of the retries added as suppressed exceptions. Such a solution can be build by chaining a recursive call, as hinted by Gili’s answer, however, in order to use this idea for exception handling, we have to use the steps to combine “compose” and “exceptionally” shown above:

public CompletableFuture<Result> executeActionAsync() {
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> retry(t, 0))
        .thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
    if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
    return executeMycustomActionHere()
        .thenApply(CompletableFuture::completedFuture)
        .exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
        .thenCompose(Function.identity());
}

CompletableFuture.failedFuture is a Java 9 method, but it would be trivial to add a Java 8 compatible backport to your code if needed:

public static <T> CompletableFuture<T> failedFuture(Throwable t) {
    final CompletableFuture<T> cf = new CompletableFuture<>();
    cf.completeExceptionally(t);
    return cf;
}

Upvotes: 41

Gili
Gili

Reputation: 90043

Here is an approach that will work for any CompletionStage subclass and does not return a dummy CompletableFuture that does nothing more than wait to get updated by other futures.

/**
 * Sends a request that may run as many times as necessary.
 *
 * @param request  a supplier initiates an HTTP request
 * @param executor the Executor used to run the request
 * @return the server response
 */
public CompletionStage<Response> asyncRequest(Supplier<CompletionStage<Response>> request, Executor executor)
{
    return retry(request, executor, 0);
}

/**
 * Sends a request that may run as many times as necessary.
 *
 * @param request  a supplier initiates an HTTP request
 * @param executor the Executor used to run the request
 * @param tries    the number of times the operation has been retried
 * @return the server response
 */
private CompletionStage<Response> retry(Supplier<CompletionStage<Response>> request, Executor executor, int tries)
{
    if (tries >= MAX_RETRIES)
        throw new CompletionException(new IOException("Request failed after " + MAX_RETRIES + " tries"));
    return request.get().thenComposeAsync(response ->
    {
        if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL)
            return retry(request, executor, tries + 1);
        return CompletableFuture.completedFuture(response);
    }, executor);
}

Upvotes: 4

xmas79
xmas79

Reputation: 5180

I think I was successfully. Here's an example class I created and the test code:


RetriableTask.java

public class RetriableTask
{
    protected static final int MAX_RETRIES = 10;
    protected int retries = 0;
    protected int n = 0;
    protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();

    public RetriableTask(int number) {
        n = number;
    }

    public CompletableFuture<Integer> executeAsync() {
        // Create a failure within variable timeout
        Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
        CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);

        // Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts. 
        // In real application this should be a real future
        final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
        if (n > 5 && retries > 5)
            taskFuture.complete(retries * n);

        // Attach the failure future to the task future, and perform a check on completion
        taskFuture.applyToEither(timeoutFuture, Function.identity())
            .whenCompleteAsync((result, exception) -> {
                if (exception == null) {
                    future.complete(result);
                } else {
                    retries++;
                    if (retries >= MAX_RETRIES) {
                        future.completeExceptionally(exception);
                    } else {
                        executeAsync();
                    }
                }
            });

        // Return the future    
        return future;
    }
}

Usage

int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
    tasks.add(new RetriableTask(i));
}

System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
    futures.add(tasks.get(i).executeAsync());
}

System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
    try {
        CompletableFuture<Integer> future = futures.get(i);
        int result = future.get();
        System.out.println(i + " result is " + result);
    } catch (Exception ex) {
        System.out.println(i + " I got exception!");
    }
}
System.out.println("Done waiting...");

Output

generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...

Main idea and some glue code (failAfter function) come from here.

Any other suggestions or improvement are welcome.

Upvotes: 11

Alex Fargus
Alex Fargus

Reputation: 122

I recently solved a similar problem using the guava-retrying library.

Callable<Result> callable = new Callable<Result>() {
    public Result call() throws Exception {
        return executeMycustomActionHere();
    }
};

Retryer<Boolean> retryer = RetryerBuilder.<Result>newBuilder()
        .retryIfResult(Predicates.<Result>isNull())
        .retryIfExceptionOfType(IOException.class)
        .retryIfRuntimeException()
        .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
        .build();

CompletableFuture.supplyAsync( () -> {
    try {
        retryer.call(callable);
    } catch (RetryException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
       e.printStackTrace();
    }
});

Upvotes: 6

Related Questions