nKognito
nKognito

Reputation: 6363

Periodic connection reset while grpc streaming

Server:

override fun subscribe(request: Subscribe, responseObserver: StreamObserver<SubscriptionEvent>) {
        sessionStore.grpcHandler(responseObserver, request.sessionId) { session ->
            eventStream.stream(session.id)
                .doOnNext {
                    try {
                        if ((responseObserver as ServerCallStreamObserver).isCancelled) {
                            log.debug { "Stopping to stream events, seems like client cancelled it" }
                            responseObserver.onCompleted()
                            return@doOnNext
                        }

                        responseObserver.onNext(it)
                    } catch (e: StatusRuntimeException) {
                        log.error("Could not stream an event", e)
                    }
                }
                .doOnError { throwable ->
                    log.error("Subscription failed", throwable)
                }
                .subscribe()
        }
    }

Client:

fun subscribe(sessionId: String, tenantId: String, botId: String) {
        subscriptionsThreadPool.submit {
            try {
                subscriptionService.withDeadlineAfter(Long.MAX_VALUE, TimeUnit.SECONDS).subscribe(
                    Subscribe.newBuilder().setSessionId(sessionId).build(),
                    SubscribeStreamObserver(sessionId, tenantId, botId)
                )

                finishLatch.await()
            } catch (e: Throwable) {
                log.error("Could not subscribe to connector-service", e)
            }
        }
    }

Server is using https://github.com/LogNet/grpc-spring-boot-starter

Client's netty config (worth to mention that there is no any proxy in front of grpc server):

private fun rpcChannel(): ManagedChannel =
        NettyChannelBuilder
            .forTarget(properties.connectorServiceUrl)
            .usePlaintext()
            .build()

As soon as I start the client subscription (i.e. call subscribe method which streams event), it takes up to 4 minutes till it fails with UNAVAILABE Connection reset exception. It's always about 3-4 minutes. I did try to set all the possible netty configuration properties, but nothing helps. Here are the logs..

Server:

2021-07-06 11:56:26.045 DEBUG [/] [-worker-ELG-3-1] io.grpc.netty.NettyServerHandler         : Connection Error

java.io.IOException: Connection reset by peer
    at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.read(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.read(Unknown Source)
    at java.base/sun.nio.ch.IOUtil.read(Unknown Source)
    at java.base/sun.nio.ch.SocketChannelImpl.read(Unknown Source)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Unknown Source)

2021-07-06 11:56:26.045 DEBUG [/] [-worker-ELG-3-1] io.grpc.netty.NettyServerHandler         : [id: 0x1f9596dc, L:/172.17.0.110:8081 - R:/46.5.255.46:58262] OUTBOUND GO_AWAY: lastStreamId=2147483647 errorCode=2 length=24 bytes=436f6e6e656374696f6e2072657365742062792070656572
2021-07-06 11:56:26.046 DEBUG [/] [-worker-ELG-3-1] i.g.n.NettyServerTransport.connections   : Transport failed

Client:

2021-07-06 13:56:25.996 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : Caught a connection error

java.net.SocketException: Connection reset
    at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:367)
    at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:398)
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)

2021-07-06 13:56:26.008 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : [id: 0x52cea98f, L:/192.168.178.20:57940 - R:/116.202.155.130:30192] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=2 length=16 bytes=436f6e6e656374696f6e207265736574
2021-07-06 13:56:26.013 DEBUG [/] [-worker-ELG-1-1] io.grpc.netty.NettyClientHandler         : Network channel is closed

io.grpc version is 1.37.0

Any ideas?

Upvotes: 0

Views: 4922

Answers (1)

Eric Anderson
Eric Anderson

Reputation: 26454

It sounds like a device along the network path is killing the connection after a period of idleness. It could be a proxy, NAT, or firewall.

If you are able to find the device, you could maybe configure it. But commonly you can't configure such things well.

gRPC supports Keep Alive that is designed for this scenario. After a period of inactivity grpc will cause activity, just to make sure the connection is still good and inform networking devices the connection is still being used.

You can configure keepalive on client-side or server-side. If the networking device is part of the server deployment, having server manage the keeepalive is best. Since the client has a non-routable IP, I expect the problem is this case is the NAT in front of the client. So configuring keepalive on the client makes more sense, as different clients may have different needs.

        NettyChannelBuilder
            .forTarget(properties.connectorServiceUrl)
            .usePlaintext()
            // Enable keepalive, with a time a bit smaller
            // than the observed resets
            .keepAliveTime(150, TimeUnit.SECONDS)
            .build()

To prevent abuse, gRPC servers limit keepalives to no lower than 5 minutes by default. So you'll also need to change your server. I've not used grpc-spring-boot-starter, but based on their docs it seems you'd use:

@Component
public class MyGRpcServerBuilderConfigurer extends GRpcServerBuilderConfigurer{
        @Override
        public void configure(ServerBuilder<?> serverBuilder) {
            ((NettyServerBuilder) serverBuilder)
                .permitKeepAliveTime(150, TimeUnit.SECONDS);
        }
    };
}

Upvotes: 2

Related Questions