wdscr.driver
wdscr.driver

Reputation: 33

How to handle in Reactor Netty an io.netty.channel.ConnectTimeoutException

I'm trying to use Reactor Netty TcpClient in reactive way to interact with hosts, that may be unreachable. Here is an example of a channel initialization logic:

ConnectionProvider connectionProvider = ConnectionProvider.fixed("fixed", 50);
TcpClient.create(connectionProvider)
  .host(host).port(port)
  .wiretap(true)
  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
  .doOnConnect(x -> log.trace("Connect to {}:{}", host, port))
  .doOnConnected(conn -> log.trace("Connected {}", conn.channel()))
  .connect()
  .subscribe(this::utilizeConnection);

the output, that i receiving :

2019-09-04 08:23:13.612 TRACE 71988 --- [ioEventLoop-4-3] c.c.pcb.poc.network.tcp.NettyTcpSender   : Connect to 192.168.88.210:2000
2019-09-04 08:23:13.684  WARN 71988 --- [actor-tcp-nio-4] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.operationComplete()

reactor.core.Exceptions$ErrorCallbackNotImplemented: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.88.210:2000
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.88.210:2000
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCreate] :
    reactor.core.publisher.Mono.create(Mono.java:183)
    reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:130)
Error has been observed by the following operator(s):
    |_  Mono.create ⇢ reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:130)
    |_  Mono.doOnSubscribe ⇢ reactor.netty.tcp.TcpClientDoOn.connect(TcpClientDoOn.java:58)

The 'inbound' and 'outbound' are having a dedicated method to handle their errors, but they works on top of a Connection instance that won't be created if you got the 'connection timeout'.

I tried:

  1. The exception, that i receiving is wrapped in 'ErrorCallbackNotImplemented'. But I wasn't able to find any way to implement any 'ErrorCallback'

  2. The log contains a warning message from 'io.netty.util.concurrent.DefaultPromise' . but I wasn't able to find a way how to make own Promise to handle it in a right way.

  3. No any configurations i've found that may somehow intercept connection timeouts.

  4. workaround. The blocked approach to create a connection ( .block() instead of .subscribe()) will allow me to catch any Connection creating exceptions within plain try-catch block, but i'll lose the benefits of reactive approach with such workaround.

Do somebody may suggest me at least something to help me to find a right way to handle a 'io.netty.channel.ConnectTimeoutException'?

Upvotes: 3

Views: 11480

Answers (1)

Oleh Dokuka
Oleh Dokuka

Reputation: 12184

Do not forget to implement your error callback

Usually reactor.core.Exceptions$ErrorCallbackNotImplemented happens when there is subscription over labmda based .subscribe method (same for Mono and Flux).

If you are going to look at the sources here and here, you will find the place where reactor.core.Exceptions$ErrorCallbackNotImplemented is thrown!

Action Points

In order to handle the original io.netty.channel.ConnectTimeoutException I would recommend looking at Handling Errors section of the original Project Reactor documentation

Upvotes: 1

Related Questions