Reputation: 81
I've been trying to implement an orchestration system using VirtualThreads.
So far, I understand the underlying problems that I may face, and the one that really concerns me is "pinning."
I've decided to implement some classes using JFR to have a very detailed trace and metrics about where and when a virtual thread is being pinned to a carrier thread.
During this process, I found that the classic Apache HttpClient 4.x (the one used by all the projects I work on) pins virtual threads to carrier threads when the connection pool is full. This happens because of an NIO call that blocks the VT until a connection becomes available in the underlying connection pool. This kind of pinning lasts for 200-400 ms, depending on the pool, which I think is unacceptable. This situation motivated me to migrate to Apache HttpClient 5, which I found to be compatible with VTs.
After some further testing, I noticed that other portions of the code were also pinning threads, particularly when calling the execute()
method of the HttpClient.
After some back and forth, I came up with a solution that I'd like to share to discuss whether it is a good approach or if it might lead to other problems.
I decided to create a WebClient
(instead of a RestClient
) from WebFlux:
return WebClient.builder()
.baseUrl(baseUrl)
.filter(new CustomWebClientFilter(this.clientId).toExchangeFilterFunction())
.codecs(configurer -> {
configurer.defaultCodecs().jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper));
configurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper));
})
.build();
Then, when using this WebClient, I use toFuture() to avoid reactive programming throughout my entire application. Here's how I implemented it:
public XsDiscountResponse getById(String socialId, String company) {
try {
CompletableFuture<XsDiscountResponse> future = this.connector
.get()
.uri(uriBuilder -> uriBuilder
.path(GET_BY_ID_PATH)
.queryParam("company_id", company)
.build(socialId))
.accept(APPLICATION_JSON)
.retrieve()
.onStatus(
HttpStatusCode::isError,
response -> response.bodyToMono(String.class)
.flatMap(body -> Mono.error(
new ConnectorException(
"Error: " + response.statusCode() + ", Body: " + body))))
.bodyToMono(XsDiscountResponse.class)
.toFuture(); // Convert Mono to CompletableFuture
return future.get(); // Compatible with Loom for non-pinning waits
} catch (Exception ex) {
throw new ConnectorException("Error occurred during the request", ex);
}
}
With this solution, I still handle my external dependencies in a blocking style, but I delegate the scheduling of threads to the VTs in my JVM using Future.
If I use the block() method from Mono, I understand that it will cause pinning.
Is there anything in this approach that is incorrect? Am I missing something important about how this blocking/async code might behave in a real-world application?
I will continue testing and monitoring pinned threads and response times to ensure everything works as expected. However, since VTs are relatively "new" and I haven't used asynchronous WebClient
before, I'm not completely sure if this approach is correct.
Upvotes: 3
Views: 65