Martin Locurcio
Martin Locurcio

Reputation: 81

Using toFuture in Spring webClient when using virtual threads

I've been trying to implement an orchestration system using VirtualThreads.

So far, I understand the underlying problems that I may face, and the one that really concerns me is "pinning."

I've decided to implement some classes using JFR to have a very detailed trace and metrics about where and when a virtual thread is being pinned to a carrier thread.

During this process, I found that the classic Apache HttpClient 4.x (the one used by all the projects I work on) pins virtual threads to carrier threads when the connection pool is full. This happens because of an NIO call that blocks the VT until a connection becomes available in the underlying connection pool. This kind of pinning lasts for 200-400 ms, depending on the pool, which I think is unacceptable. This situation motivated me to migrate to Apache HttpClient 5, which I found to be compatible with VTs.

After some further testing, I noticed that other portions of the code were also pinning threads, particularly when calling the execute() method of the HttpClient.

After some back and forth, I came up with a solution that I'd like to share to discuss whether it is a good approach or if it might lead to other problems.

I decided to create a WebClient (instead of a RestClient ) from WebFlux:

return WebClient.builder()
        .baseUrl(baseUrl)
        .filter(new CustomWebClientFilter(this.clientId).toExchangeFilterFunction())
        .codecs(configurer -> {
          configurer.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper));
          configurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper));
        })
        .build();

Then, when using this WebClient, I use toFuture() to avoid reactive programming throughout my entire application. Here's how I implemented it:

public XsDiscountResponse getById(String socialId, String company) {
  try {
    CompletableFuture<XsDiscountResponse> future = this.connector
            .get()
            .uri(uriBuilder -> uriBuilder
                    .path(GET_BY_ID_PATH)
                    .queryParam("company_id", company)
                    .build(socialId))
            .accept(APPLICATION_JSON)
            .retrieve()
            .onStatus(
                    HttpStatusCode::isError,
                    response -> response.bodyToMono(String.class)
                            .flatMap(body -> Mono.error(
                                    new ConnectorException(
                                            "Error: " + response.statusCode() + ", Body: " + body))))
            .bodyToMono(XsDiscountResponse.class)
            .toFuture(); // Convert Mono to CompletableFuture

    return future.get(); // Compatible with Loom for non-pinning waits
  } catch (Exception ex) {
    throw new ConnectorException("Error occurred during the request", ex);
  }
}

With this solution, I still handle my external dependencies in a blocking style, but I delegate the scheduling of threads to the VTs in my JVM using Future.

If I use the block() method from Mono, I understand that it will cause pinning.

Is there anything in this approach that is incorrect? Am I missing something important about how this blocking/async code might behave in a real-world application?

I will continue testing and monitoring pinned threads and response times to ensure everything works as expected. However, since VTs are relatively "new" and I haven't used asynchronous WebClient before, I'm not completely sure if this approach is correct.

Upvotes: 3

Views: 65

Answers (0)

Related Questions