Mercious
Mercious

Reputation: 398

Thread safety for method that returns Mono based on mutable attribute in Java

In my Spring Boot application I have a component that is supposed to monitor the health status of another, external system. This component also offers a public method that reactive chains can subscribe to in order to wait for the external system to be up.

@Component
public class ExternalHealthChecker {
  private static final Logger LOG = LoggerFactory.getLogger(ExternalHealthChecker.class);

  private final WebClient externalSystemWebClient = WebClient.builder().build(); // config omitted

  private volatile boolean isUp = true;
  private volatile CompletableFuture<String> completeWhenUp = new CompletableFuture<>();

  @Scheduled(cron = "0/10 * * ? * *")
  private void checkExternalSystemHealth() {
    webClient.get() //
        .uri("/health") //
        .retrieve() //
        .bodyToMono(Void.class) //
        .doOnError(this::handleHealthCheckError) //
        .doOnSuccess(nothing -> this.handleHealthCheckSuccess()) //
        .subscribe(); //
  }

  private void handleHealthCheckError(final Throwable error) {
    if (this.isUp) {
      LOG.error("External System is now DOWN. Health check failed: {}.", error.getMessage());
    }
    this.isUp = false;
  }

  private void handleHealthCheckSuccess() {
  // the status changed from down -> up, which has to complete the future that might be currently waited on  
  if (!this.isUp) {
      LOG.warn("External System is now UP again.");
      this.isUp = true;
      this.completeWhenUp.complete("UP");
      this.completeWhenUp = new CompletableFuture<>();
    }
  }


  public Mono<String> waitForExternalSystemUPStatus() {
    if (this.isUp) {
      LOG.info("External System is already UP!");
      return Mono.empty();
    } else {
      LOG.warn("External System is DOWN. Requesting process can now wait for UP status!");
      return Mono.fromFuture(completeWhenUp);
    }
  }
}

The method waitForExternalSystemUPStatus is public and may be called from many, different threads. The idea behind this is to provide some of the reactive flux chains in the application a method of pausing their processing until the external system is up. These chains cannot process their elements when the external system is down.

someFlux
  .doOnNext(record -> LOG.info("Next element")
  .delayUntil(record -> externalHealthChecker.waitForExternalSystemUPStatus())
  ... // starting processing

The issue here is that I can't really wrap my head around which part of this code needs to be synchronised. I think there should not be an issue with multiple threads calling waitForExternalSystemUPStatusat the same time, as this method is not writing anything. So I feel like this method does not need to be synchronised. However, the method annotated with @Scheduled will also run on it's own thread and will in-fact write the value of isUp and also potentially change the reference of completeWhenUpto a new, uncompleted future instance. I have marked these two mutable attributes with volatilebecause from reading about this keyword in Java it feels to me like it would help with guaranteeing that the threads reading these two values see the latest value. However, I am unsure if I also need to add synchronized keywords to part of the code. I am also unsure if the synchronized keyword plays well with reactor code, I have a hard time finding information on this. Maybe there is also a way of providing the functionality of the ExternalHealthCheckerin a more complete, reactive way, but I cannot think of any.

Upvotes: 2

Views: 1291

Answers (1)

Michael Berry
Michael Berry

Reputation: 72344

I'd strongly advise against this approach. The problem with threaded code like this is it becomes immensely difficult to follow & reason about. I think you'd at least need to synchronise the parts of handleHealthCheckSuccess() and waitForExternalSystemUPStatus() that reference your completeWhenUp field otherwise you could have a race hazard on your hands (only one writes to it, but it might be read out-of-order after that write) - but there could well be something else I'm missing, and if so it may show as one of these annoying "one in a million" type bugs that's almost impossible to pin down.

There should be a much more reliable & simple way of achieving this though. Instead of using the Spring scheduler, I'd create a flux when your ExternalHealthChecker component is created as follows:

healthCheckStream = Flux.interval(Duration.ofMinutes(10))
        .flatMap(i ->
                webClient.get().uri("/health")
                        .retrieve()
                        .bodyToMono(String.class)
                        .map(s -> true)
                        .onErrorResume(e -> Mono.just(false)))
        .cache(1);

...where healthCheckStream is a field of type Flux<Boolean>. (Note it doesn't need to be volatile, as you'll never replace it so cross-thread worries don't apply - it's the same stream that will be updated with different results every 10 minutes based on the healthcheck status, whatever thread you'll access it from.)

This essentially creates a stream of healthcheck response values every 10 minutes, always caches the latest response, and turns it into a hot source. This means that the "nothing happens until you subscribe" doesn't apply in this case - the flux will start executing immediately, and any new subscribers that come in on any thread will always get the latest result, be that a pass or a fail. handleHealthCheckSuccess() and handleHealthCheckError(), isUp, and completeWhenUp are then all redundant, they can go - and then your waitForExternalSystemUPStatus() can just become a single line:

return healthCheckStream.filter(x -> x).next();

...then job done, you can call that from anywhere and you'll have a Mono that will only complete when the system is up.

Upvotes: 5

Related Questions