Alberto San José
Alberto San José

Reputation: 471

Complete WebClient asynchronous example with Spring WebFlux

I am new to Reactive programming paradigm, but recently I have decided to base a simple Http client on Spring WebClient, since the old sync RestTemplate is already under maintenance and might be deprecated in upoming releases.

So first I had a look at Spring documentation and, after that, I've searched the web for examples.

I must say that (only for the time being) I have consciously decided not to go through the Reactor lib documentation, so beyond the Publisher-Subscriber pattern, my knowledge about Mono's and Flux's is scarce. I focused instead on having something working.

My scenario is a simple POST to send a callback to a Server from which the client is only interested in response status code. No body is returned. So I finally came up with this code snippet that works:

private void notifyJobSuccess(final InternalJobData jobData) {
        
        SuccessResult result = new SuccessResult();
        result.setJobId(jobData.getJobId());
        result.setStatus(Status.SUCCESS);
        result.setInstanceId(jobData.getInstanceId());
        
        log.info("Result to send back:" + System.lineSeparator() + "{}", result.toString());
        
        this.webClient.post()
            .uri(jobData.getCallbackUrl())
            .body(Mono.just(result), ReplaySuccessResult.class)
            .retrieve()
            .onStatus(s -> s.equals(HttpStatus.OK), resp -> {   
                log.info("Expected CCDM response received with HttpStatus = {}", HttpStatus.OK);
                return Mono.empty();
            })
            .onStatus(HttpStatus::is4xxClientError, resp -> {   
                log.error("CCDM response received with unexpected Client Error HttpStatus {}. "
                        + "The POST request sent by EDA2 stub did not match CCDM OpenApi spec", resp.statusCode());
                return Mono.empty();
            })
            .onStatus(HttpStatus::is5xxServerError, resp -> {   
                log.error("CCDM response received with unexpected Server Error HttpStatus {}", resp.statusCode());
                return Mono.empty();
            }).bodyToMono(Void.class).subscribe(Eda2StubHttpClient::handleResponseFromCcdm);
        
    }

My poor understanding of how the reactive WebClient works starts with the call to subscribe. None of the tens of examples that I checked before coding my client included such a call, but the fact is that before I included that call, the Server was sitting forever waiting for the request.

Then I bumped into the mantra "Nothing happens until you subscribe". Knowing the pattern Plublisher-Subscriber I knew that, but I (wrongly) assumed that the subscription was handled by WebClient API, in any of the exchage, or bodyToMono methods... block() definitely must subscribe, because when you block it, the request gets out at once.

So my first question is: is this call to subscribe() really needed?

Second question is why the method StubHttpClient::handleResponse is never called back. For this, the only explanation that I find is that as the Mono returned is a Mono<Void>, because there is nothing in the response besides the status code, as it is never instantiated, the method is totally dummy... I could even replace it by just .subscribe(). Is this a correct assumption.

Last, is it too much to ask for a complete example of a a method receiving a body in a Mono that is later consumed? All examples I find just focus on getting the request out, but how the Mono or Flux is later consumed is now beyond my understanding... I know that I have to end up checking the Reactor doc sooner better than later, but I would appreciate a bit of help because I am having issues with Exceptions and errors handlin.

Thanks!

Upvotes: 1

Views: 10867

Answers (3)

Shailendra Madda
Shailendra Madda

Reputation: 21551

Finally, I tried a simple web service to get the asynchronous response using .flatMap and without .subscribe

Controller:

@RestController
@RequestMapping("/weatherReport")
class WeatherController constructor(@Autowired val weatherService: WeatherService) {

    @GetMapping
    fun getTodayReport(): Mono<DailyReport> {
        return weatherService.getTodayWeatherReport()
    }

}

Service:

@Service
class WeatherService {
    val client = WebClient.create("https://api.weather.gov/gridpoints/MLB/33,70/forecast")

    fun getTodayWeatherReport(): Mono<DailyReport> {
        return client.get()
            .retrieve()
            .bodyToMono(WeatherReport::class.java)
            .flatMap { transformData(it.properties) }
    }

    private fun transformData(properties: Properties): Mono<DailyReport> {
        val response = DailyReport()
        val dailyList = arrayListOf<Daily>()

        properties.periods.forEach { period ->
            val startTime = period.startTime
            val date = startTime.substring(0, 10)
            if (AppUtils.isDateStringToday(date, "yyyy-MM-dd")) {
                val temperatureInF = period.temperature
                val temperatureInC = AppUtils.getCelsiusFromFahrenheit(temperatureInF)
                val daily = Daily(period.name, temperatureInC, period.shortForecast)
                dailyList.add(daily)
            }
        }
        response.daily = dailyList
        return Mono.just(response)
    }

}

AppUtils:

object AppUtils {

    fun getCelsiusFromFahrenheit(temperatureInF: Long): Long {
        return ((temperatureInF - 32) * 5) / 9
    }

    fun isDateStringToday(dateString: String, dateFormat: String): Boolean {
        // Define the format of the date string (e.g., "yyyy-MM-dd")
        val dateFormatter = DateTimeFormatter.ofPattern(dateFormat)
        // Parse the date string into a LocalDate
        val parsedDate = LocalDate.parse(dateString, dateFormatter)
        // Get the current date
        val currentDate = LocalDate.now()
        // Compare the parsed date with the current date
        return parsedDate.isEqual(currentDate)
    }
}

Expected response in postman: Request: http://localhost:8090/weatherReport

{
    "daily": [
        {
            "dayName": "Tuesday",
            "tempHighCelsius": 29,
            "forecastBlurp": "Mostly Sunny"
        },
        {
            "dayName": "Tuesday Night",
            "tempHighCelsius": 17,
            "forecastBlurp": "Mostly Clear"
        }
    ]
}

Upvotes: 0

Alberto San Jos&#233;
Alberto San Jos&#233;

Reputation: 471

@Toerktumlare was totally right in his comment about my previous amend, which I left as an example of how NOT to do things.

I'd like to post a new answer, now that I have recenty had to revisit the reactive programming paradigm in the context of using WebClient to perform http requests towards an external Server.

As a matter of fact, I ended up building a PoC to implement and verify the ideas I needed to put in place for the real stuff. I also have to mention that since I issued this question for the first time, hundreds of entries in blogs and sites have flourised about how to use WebClient, how to handle errors efficiently, etc. I searched for something like that at that moment and I found nearly nothing; that is no longer the case.

Put it simple, the PoC is a service with only one method, returning a Mono. Errors and retries are handled in different operations of the reactive chain. Caller of the service decides what to do, and how, with the Mono returned.

First, the WebClient bean configured like this:

@Configuration
@EnableConfigurationProperties
@ConfigurationProperties
public class GeneralConfiguration {
  @Value("${service.remote.baseUrl}")
  private String baseUrl;

  @Value("${service.remote.httpPort}")
  private int httpPort;

  @Value("${service.remote.httpsPort}")
  private int httpsPort;

  public static final int TIMEOUT = 500;

  // ...

  @Bean("webClientSslTrustAllCerts")
  public WebClient webClienSsltWithTimeout() throws SSLException {
    var sslContext =
        SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
    HttpClient httpClient =
        HttpClient.create()
            .baseUrl(String.format("%s:%d", baseUrl, httpsPort))
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIMEOUT)
            .doOnConnected(
                conn -> conn.addHandlerLast(new ReadTimeoutHandler(TIMEOUT, TimeUnit.MILLISECONDS)))
            .secure(
                spec -> {
                  SslProvider.Builder builder = spec.sslContext(sslContext);
                })
        /*
        .wiretap(
            "reactor.netty.http.client.HttpClient",
            LogLevel.DEBUG,
            AdvancedByteBufFormat.TEXTUAL)*/ ;
    return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
  }
}

The bean is injected in the service class, exposing the method that will take care of preparing a GET request:

@Service
public class StupidService {
  private static final Logger LOG = LogManager.getLogger(StupidService.class);
  private final WebClient webClient;

  private volatile String instance;
  private volatile int errSc;

  private final AtomicInteger pendingResponsesCtr = new AtomicInteger();
  private static final int MAX_RETRIES = 3;

  public StupidService(@Qualifier("webClientSslTrustAllCerts") final WebClient wc) {
    webClient = wc;
  }

  /**
   * send a http GET operation to a remote service.
   *
   * @param scenario
   * @return a {@link Mono} wrapping the body of the response received, or a {@link Throwable}
   *     representing the error returned.
   */
  public Mono<Object> saySomething(final String scenario) {
    LOG.info("Entering service to fetch remote resources with WebClient (scenario '{}')", scenario);
    instance = String.format("/mock/%s", scenario);
    return webClient
        .get()
        .uri(instance)
        .retrieve()
        .onStatus(
            HttpStatusCode::is4xxClientError, response -> handleClientErrors(response.statusCode()))
        .onStatus(
            HttpStatusCode::is5xxServerError, response -> handleServerErrors(response.statusCode()))
        .bodyToMono(Object.class)
        .doFirst(pendingResponsesCtr::incrementAndGet)
        .doOnError(err -> LOG.info("Error Occurred: {}", err.getMessage()))
        .doOnSuccess(b -> LOG.info("Successful response arrived with body: {}", b))
        .retryWhen(
            Retry.backoff(MAX_RETRIES, Duration.ofMillis(500L))
                .filter(t -> t instanceof ServiceRetryErrException)
                // takes care of keeping the balance of pending requests waiting for an answer in
                // every retry
                .doAfterRetry(retrySignal -> pendingResponsesCtr.decrementAndGet()))
        .doFinally(
            // decrement the counter for the first request pending og and answer (and the only one
            // if there were no retries)
            signalType -> {
              LOG.info(
                  "There are {} requests waiting for an answer",
                  pendingResponsesCtr.decrementAndGet());
            })
        .onErrorResume(
            err -> {
              LOG.info("Error: {}", err.getMessage());
              return Mono.just(createProblemDetails(err));
            });
  }

  private Mono<? extends Throwable> handleClientErrors(final HttpStatusCode statusCode) {
    // ...
  }

  private Mono<? extends Throwable> handleServerErrors(final HttpStatusCode statusCode) {
    // ...
  }

  private ProblemDetails createProblemDetails(final Throwable t) {
    final ProblemDetails p = new ProblemDetails();
    // ...
    return p;
  }

  /**
   * just a method to know when the asynchronous operation is done. used for tests
   */
  @VisibleForTesting
  int getPendingRequestsCounter() {
    LOG.info("Counter of pending responses is: {}", pendingResponsesCtr.get());
    return pendingResponsesCtr.get();
  }
}

The method handles everything: errors, retries and completion by providing the appropriate callbacks along the reactive chain operators. It returns a Mono that callers will have to decide whether block or susbcribe upon.

The following two tests show both approaches: the first test blocks on the mono, by forcing the synchronous execution of flow: control is not returned to the caller until the response arrives. If serviceUt.saySomething() was based on RestTemplate rather than on WebClient the test would work perfectly: the validation of the response can be done right after the sending call because this will block until the answer arrives.

  @Test
  void testSaySomethingOkBlocking() {
    stubFor(get(TEST_OK_PATH).willReturn(okJson(OK_RESULT_BODY)));

    serviceUt.saySomething(PARAM_SCENARIO_OK).block();

    verify(1, getRequestedFor(urlEqualTo(TEST_OK_PATH)));
  }

So we don't get the benefit of the reactive approach here, just perhaps a more elegant code to define the flow of request handling, but blocking the execution.

The second test subscribes to the Mono and provides callbacks, one for the success case, when a success response is returned along with whatever body has inside; and another one for handling the case of error, when the Mono returns a Throwable with some error that might have happened:

  @Test
  void testSaySomethingOkBestApproach() {
    stubFor(get(TEST_OK_PATH).willReturn(okJson(OK_RESULT_BODY)));

    serviceUt
        .saySomething(PARAM_SCENARIO_OK)
        .flatMap(this::getJsonPrettyString)
        .subscribe(this::bodyConsumer, this::errConsumer);

    LOG.info("We have subscribed and execution continues here, while WebClient does its job async");

    await().atMost(2L, TimeUnit.SECONDS).until(() -> serviceUt.getPendingRequestsCounter() == 0);

    LOG.info("Reactive stream finished, successfully in this case");

    // now we are ready to do assessments
    verify(1, getRequestedFor(urlEqualTo(TEST_OK_PATH)));
  }

In this case, the reactor thread releases the execution right after subscribe, so that the application (in this case the awaitility utility) can continue doing its job, which is no other thing than just checking when the service is done. One it is done, and only then, we can validate the response received with WireMock. The whole thing becomes obvious in the logs of the execution:

2023-09-28T17:52:41.662+02:00  INFO 37552 --- [           main] c.s.p.webclient.service.StupidService    : Entering service to fetch remote resources with WebClient (scenario 'ok')
2023-09-28T17:52:42.239+02:00  INFO 37552 --- [           main] c.s.p.w.service.StupidServiceTest        : We have subscribed and execution continues here, while WebClient does its job async
2023-09-28T17:52:42.369+02:00  INFO 37552 --- [aitility-thread] c.s.p.webclient.service.StupidService    : Counter of pending responses is: 1
2023-09-28T17:52:42.472+02:00  INFO 37552 --- [aitility-thread] c.s.p.webclient.service.StupidService    : Counter of pending responses is: 1
2023-09-28T17:52:42.577+02:00  INFO 37552 --- [aitility-thread] c.s.p.webclient.service.StupidService    : Counter of pending responses is: 1
2023-09-28T17:52:42.683+02:00  INFO 37552 --- [aitility-thread] c.s.p.webclient.service.StupidService    : Counter of pending responses is: 1
2023-09-28T17:52:42.738+02:00  INFO 37552 --- [ctor-http-nio-2] c.s.p.webclient.service.StupidService    : Successful response arrived with body: {scenario=ok, value=here you have a value}
2023-09-28T17:52:42.743+02:00  INFO 37552 --- [ctor-http-nio-2] c.s.p.w.service.StupidServiceTest        : Body received: {
  "scenario" : "ok",
  "value" : "here you have a value"
}
2023-09-28T17:52:42.744+02:00  INFO 37552 --- [ctor-http-nio-2] c.s.p.webclient.service.StupidService    : There are 0 requests waiting for an answer
2023-09-28T17:52:42.788+02:00  INFO 37552 --- [aitility-thread] c.s.p.webclient.service.StupidService    : Counter of pending responses is: 0
2023-09-28T17:52:42.789+02:00  INFO 37552 --- [           main] c.s.p.w.service.StupidServiceTest        : Reactive stream finished, successfully in this case
2023-09-28T17:52:42.812+02:00  INFO 37552 --- [           main] org.eclipse.jetty.server.Server          : Stopped Server@4482469c{STOPPING}[11.0.16,sto=1000]
2023-09-28T17:52:42.812+02:00  INFO 37552 --- [           main] org.eclipse.jetty.server.Server          : Shutdown Server@4482469c{STOPPING}[11.0.16,sto=1000]
2023-09-28T17:52:42.816+02:00  INFO 37552 --- [           main] o.e.jetty.server.AbstractConnector       : Stopped NetworkTrafficServerConnector@5df417a7{HTTP/1.1, (http/1.1, h2c)}{0.0.0.0:0}
2023-09-28T17:52:42.818+02:00  INFO 37552 --- [           main] o.e.jetty.server.AbstractConnector       : Stopped NetworkTrafficServerConnector@3bd7f8dc{SSL, (ssl, alpn, h2, http/1.1)}{0.0.0.0:8473}
2023-09-28T17:52:42.821+02:00  INFO 37552 --- [           main] o.e.jetty.server.handler.ContextHandler  : Stopped o.e.j.s.ServletContextHandler@14151bc5{/,null,STOPPED}
2023-09-28T17:52:42.821+02:00  INFO 37552 --- [           main] o.e.jetty.server.handler.ContextHandler  : Stopped o.e.j.s.ServletContextHandler@6b3871d6{/__admin,null,STOPPED}

The whole code can be reached at: https://github.com/asjp1970/webflux

Upvotes: 0

Alberto San Jos&#233;
Alberto San Jos&#233;

Reputation: 471

Some time has passed since I asked for help here. Now I'd like not to edit but to add an answer to my previous question, so that the answer remains clear and separate from he original question and comments. So here goes a complete example.

CONTEXT: An application, acting as a client, that requests an Access Token from an OAuth2 Authorization server. The Access Token is requested asynchronously to avoid blocking the appliction's thread while the token request is processed at the other end and the response arrives.

First, this is a class that serves Access Token to its clients (method getAccessToken): if the Access Token is already initialized and it's valid, it returns the value stored; otherwise fetches a new one calling the internal method fetchAccessTokenAsync:

public class Oauth2ClientBroker {
private static final String OAUHT2_SRVR_TOKEN_PATH= "/auth/realms/oam/protocol/openid-connect/token";
private static final String GRANT_TYPE = "client_credentials";

@Qualifier("oAuth2Client")
private final WebClient oAuth2Client;

private final ConfigurationHolder CfgHolder;

@GuardedBy("this")
private String token = null;

@GuardedBy("this")
private Instant tokenExpireTime;

@GuardedBy("this")
private String tokenUrlEndPoint;

public void getAccessToken(final CompletableFuture<String> completableFuture) {

    if (!isTokenInitialized() || isTokenExpired()) {
        log.trace("Access Token not initialized or has exired: go fetch a new one...");
        synchronized (this) {
            this.token = null;
        }
        fetchAccessTokenAsync(completableFuture);
    } else {
        log.trace("Reusing Access Token (not expired)");
        final String token;
        synchronized (this) {
            token = this.token;
        }
        completableFuture.complete(token);
    }
}

... }

Next, we will see that fetchAccessTokenAsync does:

private void fetchAccessTokenAsync(final CompletableFuture<String> tokenReceivedInFuture) {

    Mono<String> accessTokenResponse = postAccessTokenRequest();
    accessTokenResponse.subscribe(tr -> processResponseBodyInFuture(tr, tokenReceivedInFuture));

}

Two things happen here:

  1. The method postAccessTokenRequest() builds a POST request and declares how the reponse will be consumed (when WebFlux makes it available once it is received), by using exchangeToMono:
private Mono postAccessTokenRequest() {

        log.trace("Request Access Token for OAuth2 client {}", cfgHolder.getClientId());

        final URI uri = URI.create(cfgHolder.getsecServiceHostAndPort().concat(OAUHT2_SRVR_TOKEN_PATH));
            } else {
                uri = URI.create(tokenUrlEndPoint);
            }

        }
        log.debug("Access Token endpoint OAuth2 Authorization server: {}", uri.toString());

        return oAuth2Client.post().uri(uri)
                .body(BodyInserters.fromFormData("client_id", cfgHolder.getEdaClientId())
                        .with("client_secret", cfgHolder.getClientSecret())
                        .with("scope", cfgHolder.getClientScopes()).with("grant_type", GRANT_TYPE))
                .exchangeToMono(resp -> {
                    if (resp.statusCode().equals(HttpStatus.OK)) {
                        log.info("Access Token successfully obtained");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().equals(HttpStatus.BAD_REQUEST)) {
                        log.error("Bad request sent to Authorization Server!");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().equals(HttpStatus.UNAUTHORIZED)) {
                        log.error("OAuth2 Credentials exchange with Authorization Server failed!");
                        return resp.bodyToMono(String.class);
                    } else if (resp.statusCode().is5xxServerError()) {
                        log.error("Authorization Server could not generate a token due to a server error");
                        return resp.bodyToMono(String.class);
                    } else {
                        log.error("Authorization Server returned an unexpected status code: {}",
                                resp.statusCode().toString());
                        return Mono.error(new Exception(
                                String.format("Authorization Server returned an unexpected status code: %s",
                                        resp.statusCode().toString())));
                    }
                }).onErrorResume(e -> {
                    log.error(
                            "Access Token could not be obtained. Process ends here");
                    return Mono.empty();
                });
    }

The exchangeToMono method does most of the magic here: tells WebFlux to return a Mono that will asynchronously receive a signal as soon as the response is received, wrapped in a ClientResponse, the parameter resp consumed in the lambda. But it is important to keep in mind that NO request has been sent out yet at this point; we are just passing in the Function that will take the ClientResponse when it arrives and will return a Mono<String> with the part of the body of our interest (the Access Token, as we will see).

  1. Once the POST is built and the Mono returned, then the real thing starts when we subscribe to the Mono<String> returned before. As the Reacive mantra says: nothing happens until you subscribe or, in our case, the request is not actually sent until something attempts to read or wait for the response. There are other ways in WebClient fluent API to implicitly subscribe, but we have chosen here the explicit way of returing the Mono -which implements the reactor Publisher interface- and subscribe to it. Here we blocking the thread no more, releasing CPU for other stuff, probably more useful than just waiting for an answer.

So far, so good: we have sent out the request, released CPU, but where the processing will continue whenever the response comes? The subscribe() method takes as an argument a Consumer parameterized in our case with a String, being nothing less than the body of the response we are waiting for, wrapped in Mono. When the response comes, WebFlux will notify the event to our Mono, which will call the method processResponseBodyInFuture, where we finally receive the response body:

private void processResponseBodyInFuture(final String body, final CompletableFuture<String> tokenReceivedInFuture) {

    DocumentContext jsonContext = JsonPath.parse(body);

    try {
        log.info("Access Token response received: {}", body);
        final String aTkn = jsonContext.read("$.access_token");
        log.trace("Access Token parsed: {}", aTkn);
        final int expiresIn = jsonContext.read("$.expires_in");
        synchronized (this) {
            this.token = aTkn;
            this.tokenExpireTime = Instant.now().plusSeconds(expiresIn);
        }
        log.trace("Signal Access Token request completion. Processing will continue calling client...");
        tokenReceivedInFuture.complete(aTkn);
    } catch (PathNotFoundException e) {
        try {
            log.error(e.getMessage());
            log.info(String.format(
                    "Could not extract Access Token. The response returned corresponds to the error %s: %s",
                    jsonContext.read("$.error"), jsonContext.read("$.error_description")));
        } catch (PathNotFoundException e2) {
            log.error(e2.getMessage().concat(" - Unexpected json content received from OAuth2 Server"));
        }
    }

}

The invocation of this method happens as soon as the Mono is signalled about the reception of the response. So here we try to parse the json content with an Access Token and do something with it... In this case call complete() onto the CompletableFuture passed in by the caller of the initial method getAccessToken, that hopefully will know what to do with it. Our job is done here... Asynchronously!

Summary: To summarize, these are the basic considerations to have your request sent out and the responses processed when you ise reactive WebClient:

  1. Consider having a method in charge of preparing the request by means of the WebClient fluent API (to set http method, uri, headers and body). Remember: by doing this you are not sending any request yet.
  2. Think on the strategy you will use to obtain the Publisher that will be receive the http client events (response or errors). retreive() is the most straight forward, but it has less power to manipulate the response than exchangeToMono.
  3. Subscribe... or nothing will happen. Many examples you will find around will cheat you: they claim to use WebClient for asyncrhony, but then they "forget" about subscribing to the Publisher and call block() instead. Well, while this makes things easier and they seem to work (you will see responses received and passed to your application), the thing is that this is not asynchronous anymore: your Mono (or Flux, whatever you use) will be blocking until the response arrives. No good.
  4. Have a separate method (being the Consumer passed in the subscribe() method) where the response body is processed.

Upvotes: -1

Related Questions