Tin Ng
Tin Ng

Reputation: 1047

Grpc Stream closed before write could take place

I've got an app that stream data from database via GRPC upon the request of the client.

When the client shutdowns before the streaming is complete. The error is being thrown in the log, but I don't know how to capture it to close the underlying database connection, leaving a db connection leak.

I am using Java and grpc-java version 1.23.0.

  try {
... events is a stream retrieved from DB
            StreamObservers.copyWithFlowControl(response, new StreamObserverWithCallbacks<>(
                    responseObserver,
                    count -> {
                        LOGGER.info("Streaming finished);
                        events.close();
                    },
                    ex -> {
                        LOGGER.error("Error streaming",  ex);
                        events.close();
                    }
            ));
        } catch (Exception e) {
            LOGGER.error("Any other exception", e);
            events.close();
        }


Here are the exceptions in the logs when I shutdown the client

2019-08-27 18:39:34,155 WARN  [grpc-nio-worker-ELG-3-1] i.g.n.s.i.g.n.NettyServerHandler: Stream Error
io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:167)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.close(DefaultHttp2Connection.java:152)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$BaseDecoder.channelInactive(Http2ConnectionHandler.java:209)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:417)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.channelInactive(NettyServerHandler.java:586)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
    at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

As you can see, none of the exceptions have been caught in the try/catch block or on error of the streaming.

Have I missed something? Is there something that I can do?

Upvotes: 3

Views: 8298

Answers (2)

Tin Ng
Tin Ng

Reputation: 1047

StreamObserverWithCallbacks is our own class that wrapped around a StreamObserver delegate, so we just needed to do set

this.delegate.setOnCancelHandler(() -> onError.accept(new RuntimeException("Stream got cancelled")));

Upvotes: 2

Eric Anderson
Eric Anderson

Reputation: 26464

The exception you see is actually okay; it is presented because of some gRPC and Netty-internal reasons. It happens when cancellation is involved.

What you are currently missing is observing the cancellation itself. Use io.grpc.Context or ServerCallStreamObserver to observe cancellation.

Upvotes: 0

Related Questions