simplyblue
simplyblue

Reputation: 2339

Spring webclient pool is exhausted after sometime

Below are the different ways i tried to use web client to send a post, but each time the connection pool is exhausted after sometime. I get the below error.

I'm using spring boot 2.7.3 version

Please help where are the connections not getting released.

The web client initialization code

@PostConstruct
public void getWebClient() {
    logger.debug("Initializing webClient..");

    ConnectionProvider provider = ConnectionProvider.builder("fixed")
            .metrics(true)
            .maxConnections(50)
            .maxIdleTime(Duration.ofSeconds(20))
            .maxLifeTime(Duration.ofSeconds(60))
            .pendingAcquireTimeout(Duration.ofSeconds(30))
            .evictInBackground(Duration.ofSeconds(30)).build();

    HttpClient httpClient = HttpClient.create(provider)
            .wiretap(this.getClass().getCanonicalName(), LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
            .responseTimeout(Duration.ofSeconds(10));

    this.webClient = WebClient
            .builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .defaultHeader(HttpHeaders.CONTENT_TYPE, org.springframework.http.MediaType.APPLICATION_JSON_VALUE)
            .build();
}

Code 1 :

public Mono<JsonNode> sendPostR(String url, JsonNode request) {
    String uuid = UUID.randomUUID().toString();
    Instant start = Instant.now();
    logger.debug("Sending post request {} {} {}",url,request,uuid);
    return webClient.post().uri(url)
            .body(Mono.just(request),Map.class)
            .retrieve()
            .toEntity(String.class)
            .map(entity -> {
                Instant end = Instant.now();
                logger.debug("Time taken for request success{} {} ",Duration.between(start,end).toMillis(),uuid);
                if(entity.getStatusCode() != HttpStatus.OK) {
                    logger.error("Error: API returned with in-valid status code {} {}",entity.getStatusCode(),entity.getHeaders());
                    Mono.error(new RuntimeException("Error in sending post " + url));
                }
                String responseBody = entity.getBody();
                try {
                    return objectMapper.readValue(responseBody, JsonNode.class);
                } catch (JsonProcessingException e) {
                    logger.error("Error in parsing response body",e);
                    Mono.error(new RuntimeException("Error in parsing response body " + url));
                }
                return Utils.getNewObjectNode();
            }).switchIfEmpty(Mono.defer(() -> {
                logger.debug("Request completed with no body response {}",uuid);
                return Mono.just(Utils.getNewObjectNode());
            })).log();
}

Code 2:

public Mono<JsonNode> sendPostR(String url, Map<String,Object> params) {
    String uuid = UUID.randomUUID().toString();
    Instant start = Instant.now();
    logger.debug("Sending post request {} {} {}",url,params,uuid);
    return webClient.post().uri(url)
            .body(Mono.just(params),Map.class)
            .exchangeToMono(response -> {
                logger.debug("response is {}",response.statusCode());
                Instant end = Instant.now();
                logger.debug("Time taken for request success{} {} ",Duration.between(start,end).toMillis(),uuid);
                if(response.statusCode().equals(HttpStatus.OK)) {
                    return response.bodyToMono(JsonNode.class);
                }
                response.releaseBody();
                logger.debug("Error in hitting url {}",url);
                return Mono.error(new Exception("Error in sending post " + url));
            }).onErrorResume(WebClientResponseException.class, ex -> {
                Instant end = Instant.now();
                logger.debug("Time taken for request error {} {} ",Duration.between(start,end).toMillis(),uuid);
                logger.debug("Error in hitting url .... ",ex);
                return Mono.error(ex);
            }).doOnError(error -> {
                logger.error("Error in sending post request ",error);
            }).switchIfEmpty(Mono.defer(() -> {
                logger.debug("Request completed with no body response {}",uuid);
                return Mono.just(Utils.getNewObjectNode());
            }))
            .log();

Below is the code, which i use if i do not need the response. I use for fire and forget scenarios.

public Mono<ResponseEntity<Void>> raiseEvent(String url, JsonNode request) {
    String uuid = UUID.randomUUID().toString();
    logger.debug("Sending post request {} {} {}",url,request,uuid);
    return webClient.post().uri(url)
            .body(Mono.just(request),JsonNode.class)
            .retrieve()
            .toBodilessEntity() ;
}

Error which i get when pool is exhausted.

2022-09-22 15:07:52,701 ERROR ? [parallel-1] Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.web.reactive.function.client.WebClientRequestException: Pool#acquire(Duration) has been pending for more than t
he configured timeout of 30000ms; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the 
configured timeout of 30000ms
Caused by: org.springframework.web.reactive.function.client.WebClientRequestException: Pool#acquire(Duration) has been pending for more than the configured timeout of 30000ms; nested e
xception is reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the configured timeout of 30000ms
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
        *__checkpoint ⇢ Request to POST http://api.call/track [DefaultWebClient]
Original Stack Trace:
                at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
                at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
                at reactor.core.publisher.Mono.subscribe(Mono.java:4397)
                at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
                at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
                at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
                at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
                at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
                at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
                at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:308)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:158)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.run(AbstractPool.java:413)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the configured timeout of 30000ms
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.run(AbstractPool.java:413)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)

Upvotes: 0

Views: 2577

Answers (1)

Alex
Alex

Reputation: 5924

  1. In "Code 1" return statements are missing for Mono.error but I have doubts it could cause mentioned issue.

  2. There are several issues in "Code 2" with exchangeToMono usage that could potentially cause such behavior. According to documentation for exchangeToMono.

After the returned {@code Mono} completes, the response body is automatically released if it hasn't been consumed.

You don't need to call response.releaseBody() explicitly but if you do, make sure it's part of the reactive flow you return because it returns Mono<Void> and will not be resolved.

  1. Not sure why you need exchangeToMono that is typically used for advanced scenarios. From the first look, you can achieve the same with retrieve().bodyToMono(...) that is much safer.
webClient
    .method(POST).uri(url)
    .retrieve()
    .bodyToMono(JsonNode.class)

The same can be done with "Code 1" to simplify logic in map.

  1. To calculate duration better use .onNext and .onError that are designed for such "side-effect" operations.

Upvotes: 1

Related Questions