benjaminv2
benjaminv2

Reputation: 159

Reactor Netty: Connection prematurely closed BEFORE response with WebClient

When trying to process data retrieved from the WebClient at a random point in time I get the "Connection prematurely closed BEFORE response" error. The pages contain quite a lot of data that is processed afterwards (mostly DB updates and inserts). After a random amount of time, I get the error:

**Reactor Netty: Connection prematurely closed BEFORE response with WebClient.**

The method in question is the following:

  private Mono<String> migrateSomeData() {
    return getAllPages()
        .flatMapIterable(Page::getItems)
        .filter(this::isValidItem)
        .doOnNext(item -> doSomeLogging())
        .filter(item-> this.checkIfAlreadyProcessed(item))
        .flatMap(this::mapToDto)
        .flatMap(this::persist)
        .doOnNext(this::saveAsProcessed)
        .collectList()
        .map(this::getTotalAmountOfProcessed);
  }

The code has had certain parts omitted for brewity. getAllPages utilizes the Flux .expand method to recursively call the next pages. This is verified and works correctly. One more important thing to note is that all database operations are blocking as we have not implemented reactive jdbc.

I've tried updating the webclient configuration:

@Bean
  public WebClient someClient(
      ReactiveOAuth2AuthorizedClientManager clientAuthorizedClientManager) {

    ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
        new ServerOAuth2AuthorizedClientExchangeFilterFunction(clientAuthorizedClientManager);
    oauth.setDefaultClientRegistrationId("clientId");

    final int size = 16 * 1024 * 1024;
    final ExchangeStrategies strategies = buildClientWithExtendedResponseSize(size);

    return WebClient.builder()
        .defaultHeader("Accept", MediaType.APPLICATION_JSON_VALUE)
        .defaultHeader("subscription-key", someKeyValue)
        .clientConnector(createWiretappedClientHttpConnector(this.getClass()))
        .filter(oauth)
        .exchangeStrategies(strategies)
        .build();
  }

  private static ExchangeStrategies buildClientWithExtendedResponseSize(int size) {
    return ExchangeStrategies.builder()
        .codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size))
        .build();
  }

private ClientHttpConnector createWiretappedClientHttpConnector(Class<?> invokedClass) {
    HttpClient httpClient =
        HttpClient.create()
            .option(ChannelOption.SO_KEEPALIVE, true)
            .responseTimeout(Duration.ofMinutes(5))
            .doOnConnected(
                conn ->
                    conn.addHandlerLast(new ReadTimeoutHandler(5 * 60))
                        .addHandlerLast(new WriteTimeoutHandler(5 * 60)))
            .compress(true)
            .wiretap(
                invokedClass.getCanonicalName(), LogLevel.TRACE, AdvancedByteBufFormat.TEXTUAL);
    return new ReactorClientHttpConnector(httpClient);
  }

I've also tried to narrow this code down as much as possible so forgive me for any structural mistakes.

I've tested this against a local mocked instance of the API in wiremock and the issue still persists so that should rule out server configuration issues.

Can it be possible that my processing code is too slow and that the WebClients returned response is not consumed in time and the webclient decides to close the connection?

Upvotes: 4

Views: 6048

Answers (2)

supers
supers

Reputation: 31

I have encounter the similiar problem.

According to the previous answer by stelios.anastasakis, I reduced the frequency of the problem by using maxIdleTime, although it still happened occasionally.

Now I use evictInBackground with maxIdleTime. So far the result is encouraging: In a whole day test, the problem has not recurred.

            WebClient.Builder builder = WebClient.builder();

        ConnectionProvider connectionProviderWithMaxIdleTime = ConnectionProvider.builder("withMaxIdleTime")
                .maxIdleTime(Duration.ofSeconds(20)) //if issue persists reduce this
                .evictInBackground(Duration.ofSeconds(19))
                .build();

        // 设置最大缓冲字节数
        builder.clientConnector(new ReactorClientHttpConnector(
                HttpClient.create(connectionProviderWithMaxIdleTime)
                        .option(ChannelOption.SO_KEEPALIVE, true)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 65000)   // to fix connection timed out after 30000 ms ?
                        .responseTimeout(Duration.ofMinutes(5))
                        .wiretap(true)
                        .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(5 * 60))
                                .addHandlerLast(new WriteTimeoutHandler(5 * 60)))
                        .doOnDisconnected(conn -> log.info("WebClient Disconnected {} localAddress {}", webClient, conn.channel().localAddress()))
                        .compress(true)
        ));
        builder.codecs(clientCodecConfigurer -> clientCodecConfigurer.defaultCodecs().maxInMemorySize(20 * 1024 * 1024));  //20MB

        return builder.build();

Two lines of code above related to this question are:

.maxIdleTime(Duration.ofSeconds(20)) //if issue persists reduce this
.evictInBackground(Duration.ofSeconds(19))

Referring to https://projectreactor.io/docs/netty/release/reference/index.html#connection-pool-timeout , evictInBackground means:

When this option is enabled, each connection pool regularly checks for connections that are eligible for removal according to eviction criteria like maxIdleTime. By default, this background eviction is disabled.

I think you can also try it.

Upvotes: 1

stelios.anastasakis
stelios.anastasakis

Reputation: 1206

Sorry for the late reply. There is an issue about it raised on reactor's github page. This issue seems to be something like "the client sends [ACK] and no [ACK,FIN] and keeps the connection opened. So, the connection is not closed by the client and later is reused, resulting in this error".

The solution should be setting a max-idle-time (The maximum time that this connection stays idle in the connection pool). The value suggested is what has the client server as keepAliveTimeout, but if this is confusing, try something small and play around. So the solution should look like this:

ConnectionProvider connectionProviderWithMaxIdleTime = ConnectionProvider.builder("withMaxIdleTime")
                .maxIdleTime(Duration.ofSeconds(120)) //if issue persists reduce this
                .build();
HttpClient httpClient = HttpClient.create(connectionProviderWithMaxIdleTime)
                .option(ChannelOption.SO_KEEPALIVE, true)
                {...the rest of your code...}

Hope this helps!

Upvotes: 1

Related Questions