noBoom
noBoom

Reputation: 145

How to use Resilience4j Circuit Breaker with WebFlux in Spring Boot

I have service A that calls downstream service B.

Service A code

@RestController
@RequestMapping(value = "", produces = MediaType.APPLICATION_JSON_VALUE)
public class GreetingController {

    private final GreetingService greetingService;

    public GreetingController(GreetingService greetingService){
        this.greetingService = greetingService;
    }

    @GetMapping(value = "/greetings")
    public Mono<String> getGreetings() {
        return greetingService.callServiceB();
    }
}

@Component
@RequiredArgsConstructor
public class GreetingService {
    
    CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("greetingService");
    Callable<Mono<String>> callable = CircuitBreaker.decorateCallable(circuitBreaker, this::clientCall);
    Future<Mono<String>> future = Executors.newSingleThreadExecutor().submit(callable);

    public Mono<String> callServiceB() {
        try {
            return future.get();
        } catch (CircuitBreakerOpenException | InterruptedException | ExecutionException ex){
            return Mono.just("Service is down!");
        }
    }


    private final String url = "/v1/holidaysgreetings";
    
    private Mono<String> clientCall(){
        WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();
        
        return client
                .get()
                .uri(url)
                .retrieve()
                .bodyToMono(String.class);
}

when i shut down downstream service B(running on localhost:8080) and hit /greetings endpoint in GreetingsController class to see if my circuit breaker is working properly or not, i get very this nasty error

2021-06-28 21:27:31.431 ERROR 10285 --- [nio-8081-exec-7] o.a.c.c.C.[.[.[.[dispatcherServlet]: Servlet.service() for servlet [dispatcherServlet] in context with path [/v1/holidaysgreetings] 
threw exception [Request processing failed; nested exception is org.springframework.web.reactive.function.client.WebClientRequestException: Connection refused: localhost/127.0.0.1:8080; 
nested exception is io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8080] with root cause

java.net.ConnectException: Connection refused

Anyone knows why i am getting this? What i am missing here? Am i implementing circuit breaker correctly?

Upvotes: 3

Views: 3742

Answers (1)

Felipe
Felipe

Reputation: 7563

You are mixing reactive libraries with regular non-reactive libraries. If you aim to use spring-webflux it is better to use the reactor-resilience4j together with the regular reactor-adapter library.

Use these imports:

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j'
implementation "io.projectreactor.addons:reactor-adapter:${reactorVersion}"

You are also not creating the circuit-breaker service that you can rely on. After creating it you can call the " Mono run(Mono toRun, Function<Throwable, Mono> fallback)" (to the one that return a Flux if you want) to execute your service and provide a fallback.

Here is one example from a demo code.

@RestController
public class CompletableFutureDemoController {

    Logger LOG = LoggerFactory.getLogger(CompletableFutureDemoController.class);

    private CompletableFutureHttpBinService httpBin;
    private ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory;

    public CompletableFutureDemoController(CompletableFutureHttpBinService httpBin, ReactiveCircuitBreakerFactory reactiveCircuitBreakerFactory) {
        this.httpBin = httpBin;
        this.reactiveCircuitBreakerFactory = reactiveCircuitBreakerFactory;
    }

    @GetMapping("/completablefuture/delay/{seconds}")
    public Mono<Map> delay(@PathVariable int seconds) {
        return reactiveCircuitBreakerFactory.create("completablefuturedelay")
            .run(Mono.fromFuture(httpBin.delay(seconds)), t -> {
                LOG.warn("delay call failed error", t);
                Map<String, String> fallback = new HashMap();
                fallback.put("hello", "world");
                return Mono.just(fallback);
            }
        );
    }
}

Upvotes: 2

Related Questions