Reputation: 1
I noticed some asymmetry for bidirectional RPC stream closure between RPC stream server and RPC stream client.
onCompleted()
onError()
Is this expected?
I expect that RPC stream server sends onCompleted(), it will not close the entire stream.
I expect that RPC stream server sends onError(), it will also receive onError().
public class GrpcPlayground {
private final ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r);
t.setName("grpc-client-thread");
return t;
});
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("schedule-service-thread");
return t;
});
private static final boolean IS_LINUX = SystemUtil.osName().startsWith("linux");
private static final int SERVER_PORT = 50051;
public static void main(String[] args) throws IOException, InterruptedException {
new GrpcPlayground().run();
}
private ManagedChannel managedChannel;
private NettyServerBuilder grpcServerBuilder;
private Server server;
void run() throws IOException, InterruptedException {
grpcServerBuilder = NettyServerBuilder
.forPort(SERVER_PORT)
.directExecutor()
.channelType(IS_LINUX ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.bossEventLoopGroup(IS_LINUX ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1))
.workerEventLoopGroup(IS_LINUX ? new EpollEventLoopGroup(2) : new NioEventLoopGroup(2))
.withChildOption(ChannelOption.SO_RCVBUF, 256 * 1024)
.withChildOption(ChannelOption.SO_SNDBUF, 256 * 1024)
.withChildOption(ChannelOption.TCP_NODELAY, true);
managedChannel = NettyChannelBuilder.forAddress("localhost", SERVER_PORT)
.withOption(ChannelOption.SO_RCVBUF, 2 * 1024 * 1024)
.withOption(ChannelOption.TCP_NODELAY, true)
.usePlaintext()
.directExecutor()
.disableRetry()
.build();
server = grpcServerBuilder.addService(new HelloServiceGrpc.HelloServiceImplBase() {
@Override
public StreamObserver<HelloServiceProto.HelloRequest> bidiHello(StreamObserver<HelloServiceProto.HelloResponse> responseObserver) {
ServerCallStreamObserver<HelloServiceProto.HelloResponse> serverCallStreamObserver =
(ServerCallStreamObserver<HelloServiceProto.HelloResponse>) responseObserver;
serverCallStreamObserver.setOnCancelHandler(() -> {
System.out.printf("server rpc got cancelled, threadName: %s%n", Thread.currentThread().getName());
});
return new StreamObserver<HelloServiceProto.HelloRequest>() {
@Override
public void onNext(HelloServiceProto.HelloRequest value) {
System.out.printf("server got request, threadName: %s%n", Thread.currentThread().getName());
System.out.println("server sent onCompleted()");
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.out.printf("server got error. status: %s, threadName: %s%n", status.getCode(), Thread.currentThread().getName());
try {
System.out.println("server sent onNext()");
responseObserver.onNext(HelloServiceProto.HelloResponse.newBuilder().build());
} catch (Throwable e) {
System.out.println("server sent onNext() throws exception");
}
try {
System.out.println("server sent onCompleted()");
responseObserver.onCompleted();
} catch (Throwable e) {
System.out.println("server sent onCompleted() throws exception");
}
try {
System.out.println("server sent onError()");
responseObserver.onError(new RuntimeException("test"));
} catch (Throwable e) {
System.out.println("server sent onError() throws exception");
}
}
@Override
public void onCompleted() {
System.out.printf("server got onCompleted, threadName: %s%n", Thread.currentThread().getName());
}
};
}
}).build().start();
HelloServiceGrpc.HelloServiceStub stub = HelloServiceGrpc.newStub(managedChannel).withExecutor(executorService);
HelloServiceProto.HelloRequest request = HelloServiceProto.HelloRequest.newBuilder().build();
StreamObserver<HelloServiceProto.HelloRequest> streamObserver = stub.bidiHello(
new StreamObserver<HelloServiceProto.HelloResponse>() {
@Override
public void onNext(HelloServiceProto.HelloResponse value) {
System.out.printf("client got response, threadName: %s%n", Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.out.printf("client got error. status: %s, threadName: %s%n", status.getCode(), Thread.currentThread().getName());
}
@Override
public void onCompleted() {
System.out.printf("client got onCompleted, threadName: %s%n", Thread.currentThread().getName());
}
});
System.out.println("client sent request");
streamObserver.onNext(request);
Thread.sleep(1000);
try {
System.out.println("client sent onNext");
streamObserver.onNext(request);
System.out.println("client sent onCompleted");
streamObserver.onCompleted();
} catch (Exception e) {
System.out.println("client calls onError() throws exception");
}
Thread.sleep(10000000);
managedChannel.shutdownNow();
server.shutdownNow();
}
}
This is output:
Upvotes: 0
Views: 21