Steven Hu
Steven Hu

Reputation: 1

Bidirectional RPC stream closure

I noticed some asymmetry for bidirectional RPC stream closure between RPC stream server and RPC stream client.

onCompleted()

onError()

Is this expected?

I expect that RPC stream server sends onCompleted(), it will not close the entire stream.

I expect that RPC stream server sends onError(), it will also receive onError().

public class GrpcPlayground {
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
        Thread t = new Thread(r);
        t.setName("grpc-client-thread");
        return t;
    });

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread t = new Thread(r);
        t.setName("schedule-service-thread");
        return t;
    });

    private static final boolean IS_LINUX = SystemUtil.osName().startsWith("linux");
    private static final int SERVER_PORT = 50051;

    public static void main(String[] args) throws IOException, InterruptedException {
        new GrpcPlayground().run();
    }

    private ManagedChannel managedChannel;
    private NettyServerBuilder grpcServerBuilder;
    private Server server;

    void run() throws IOException, InterruptedException {

        grpcServerBuilder = NettyServerBuilder
                .forPort(SERVER_PORT)
                .directExecutor()
                .channelType(IS_LINUX ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .bossEventLoopGroup(IS_LINUX ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1))
                .workerEventLoopGroup(IS_LINUX ? new EpollEventLoopGroup(2) : new NioEventLoopGroup(2))
                .withChildOption(ChannelOption.SO_RCVBUF, 256 * 1024)
                .withChildOption(ChannelOption.SO_SNDBUF, 256 * 1024)
                .withChildOption(ChannelOption.TCP_NODELAY, true);

        managedChannel = NettyChannelBuilder.forAddress("localhost", SERVER_PORT)
                .withOption(ChannelOption.SO_RCVBUF, 2 * 1024 * 1024)
                .withOption(ChannelOption.TCP_NODELAY, true)
                .usePlaintext()
                .directExecutor()
                .disableRetry()
                .build();

        server = grpcServerBuilder.addService(new HelloServiceGrpc.HelloServiceImplBase() {
            @Override
            public StreamObserver<HelloServiceProto.HelloRequest> bidiHello(StreamObserver<HelloServiceProto.HelloResponse> responseObserver) {
                ServerCallStreamObserver<HelloServiceProto.HelloResponse> serverCallStreamObserver =
                        (ServerCallStreamObserver<HelloServiceProto.HelloResponse>) responseObserver;
                serverCallStreamObserver.setOnCancelHandler(() -> {
                    System.out.printf("server rpc got cancelled, threadName: %s%n", Thread.currentThread().getName());
                });


                return new StreamObserver<HelloServiceProto.HelloRequest>() {
                    @Override
                    public void onNext(HelloServiceProto.HelloRequest value) {
                        System.out.printf("server got request, threadName: %s%n", Thread.currentThread().getName());
                        System.out.println("server sent onCompleted()");
                        responseObserver.onCompleted();
                    }

                    @Override
                    public void onError(Throwable t) {
                        Status status = Status.fromThrowable(t);
                        System.out.printf("server got error. status: %s, threadName: %s%n", status.getCode(), Thread.currentThread().getName());
                        try {
                            System.out.println("server sent onNext()");
                            responseObserver.onNext(HelloServiceProto.HelloResponse.newBuilder().build());
                        } catch (Throwable e) {
                            System.out.println("server sent onNext() throws exception");
                        }
                        try {
                            System.out.println("server sent onCompleted()");
                            responseObserver.onCompleted();
                        } catch (Throwable e) {
                            System.out.println("server sent onCompleted() throws exception");
                        }
                        try {
                            System.out.println("server sent onError()");
                            responseObserver.onError(new RuntimeException("test"));
                        } catch (Throwable e) {
                            System.out.println("server sent onError() throws exception");
                        }
                    }

                    @Override
                    public void onCompleted() {
                        System.out.printf("server got onCompleted, threadName: %s%n", Thread.currentThread().getName());
                    }
                };
            }
        }).build().start();


        HelloServiceGrpc.HelloServiceStub stub = HelloServiceGrpc.newStub(managedChannel).withExecutor(executorService);
        HelloServiceProto.HelloRequest request = HelloServiceProto.HelloRequest.newBuilder().build();
        StreamObserver<HelloServiceProto.HelloRequest> streamObserver = stub.bidiHello(
                new StreamObserver<HelloServiceProto.HelloResponse>() {
                    @Override
                    public void onNext(HelloServiceProto.HelloResponse value) {
                        System.out.printf("client got response, threadName: %s%n", Thread.currentThread().getName());

                    }

                    @Override
                    public void onError(Throwable t) {
                        Status status = Status.fromThrowable(t);
                        System.out.printf("client got error. status: %s, threadName: %s%n", status.getCode(), Thread.currentThread().getName());
                    }

                    @Override
                    public void onCompleted() {
                        System.out.printf("client got onCompleted, threadName: %s%n", Thread.currentThread().getName());
                    }
                });

        System.out.println("client sent request");
        streamObserver.onNext(request);
        Thread.sleep(1000);
        try {
            System.out.println("client sent onNext");
            streamObserver.onNext(request);
            System.out.println("client sent onCompleted");
            streamObserver.onCompleted();
        } catch (Exception e) {
            System.out.println("client calls onError() throws exception");
        }

        Thread.sleep(10000000);
        managedChannel.shutdownNow();
        server.shutdownNow();
    }
}

This is output:

output from above code sample

Upvotes: 0

Views: 21

Answers (0)

Related Questions