pablo
pablo

Reputation: 747

on error do another call and retry in webflux

I'd like to do the following using the WebClient from spring webflux:

I've got this far:

webclient.get()
  .uri("/endpoint1")
  .retrieve()
  .bodyToFlux(MyBody.class)
  .retry(error -> {
     if (error == expectedError) {
       webclient.get()
         .uri("/endpoint2")
         .retrieve().block();
       return true;
     } else {
       false;
     });

I cannot block when requesting endpoint2 since I would get the following error: block()/blockFirst()/blockLast() are blocking, which is not supported in thread (I wouldn't like to block either).

Maybe I should use retryWhen but I'm not really sure how to use it.

Upvotes: 0

Views: 2997

Answers (2)

Janardhan B. Chinta
Janardhan B. Chinta

Reputation: 59

Chain all post calls. Chain returns on first successful call. Return the Mono to controller or subscribe to execute.

public Mono<Object> post(String url, Object payload) {
    return WebClient.builder().baseUrl(url).build().post()
        .bodyValue(BodyInserters.fromValue(payload)).retrieve().bodyToMono(Object.class);
}

public Mono<Object> chainOnError(List<String> urls, Object payload) {
    Mono<Object> callChain = post(urls.get(0), payload);
    for (int i = 1; i < urls.size(); i++) {
        String url = urls.get(i);
        callChain = callChain.onErrorResume(th -> {
            System.out.println("Failed post for " + url + ", " + th.getMessage());
            return post(url, payload);
        });
    }

    return callChain.onErrorResume(th -> Mono.error(new RuntimeException("All remotes failed.")));
} 

Upvotes: 0

pablo
pablo

Reputation: 747

The only way I made this work was with retryWhen I could not use reactor.retry.Retry#doOnRetry because it only accepts a Consumer not a Mono or Flux or Publisher.

The snippet is as follows:

webclient.get()
  .uri("/endpoint1")
  .retrieve()
  .bodyToFlux(MyBody.class)
  .retryWhen(errorCurrentAttempt -> errorCurrentAttempt
                .flatMap(currentError -> Mono.subscriberContext().map(ctx -> Tuples.of(currentError, ctx)))
                .flatMap(tp -> {
                    Context ctx = tp.getT2();
                    Throwable error = tp.getT1();
                    int maxAttempts = 3;
                    Integer rl = ctx.getOrDefault("retriesLeft", maxAttempts);
                    if (rl != null && rl > 0 && error == myExpectedError) {
                        // Call endpoint and retry
                        return webclient.get()
                                .uri("/endpoint2")
                                .retrieve()
                                .thenReturn(ctx.put("retriesLeft", rl - 1));
                    } else {
                        // Finish retries
                        return Mono.<Object>error(error);
                    }
                }));

Upvotes: 1

Related Questions