Diarmuid Leonard
Diarmuid Leonard

Reputation: 123

Can gRPC Java client send multiple requests in parallel over a long lived gRPC stream and how to manage N streams

I am using a 'Streaming RPC' API where the both MyRequests and MyResponse are streamed

service MyStreamedService {
  rpc myOperation(**stream** MyRequest) returns (**stream** MyResponse)
}

Here is a slightly simplified version of a class that wraps a gRPC stream;

public class MyStreamWrapper implements StreamObserver<MyResponse> {
  public MyStreamWrapper(ManagedChannel myChannel) {
    myStub = MyStreamedServiceGrpc.newStub(myChannel);
    // create a stream and maintain a long lived reference to the stream via StreamObserver's
    myStream = myStub.myOperation(this);
  }
  
  @Override
  public void onNext(MyResponse r) {
    // handle the response (not shown)
  }
  
  @Override
  public void onError(Throwable t) {
    // very unfortunate that there is no error code in this API !
    // throttle (not shown but if I don't throttle, eats CPU)
    // Create a new stream
    myStream = myStub.myOperation(this);
  }
  
  @Override
  public void onCompleted() {
    // server has called StreamObserver<MyRequest>.onCompleted
    // Create a new stream using the async API
    myStream = myStub.myOperation(this);
  }
  
  // Context: many threada that want to send a request asynchronously
  public void send(MyRequest r) {
    synchronized(myStream) {
      myStream.onNext(r);
    }
  }
}

Questions

  1. Why does access to myStream need to be synchronized in the send method? I would like to understand why I must synchronize threads that would like to send unordered requests in parallel on the same stream. If each request is packaged in a HTTP2 DATA frame with its own stream-id then is this just unique to the Java implementation of the gRPC client?
  2. What is guaranteed to have happened when a thread returns from method send?
  1. Given that the client threads are synchronized to one invocation of onNext at a time, can the client overload the server or is back-pressure applied by blocking the client threads in the above send method? I see Streams closing and errors like "INTERNAL: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR" under traffic.
  2. Is it unusual to maintain and re-use myStream given that streams are cheap to create?
  3. Given that a stream can only be directed to one server, am I correct in thinking that I need to add more code to the simple class above in order to create N myStream on channel creation and then round-robin the send method on each of the N myStream. Unfortunately, there is no API to determine if myStream is currently 'busy' with another RPC request. Alternatively, I could create new streams on the fly and add a sempaphore(size:N) to limit the number of threads attempting to use them.

Things I think I understand ...

Upvotes: 1

Views: 5208

Answers (2)

morgwai
morgwai

Reputation: 2803

  1. First, HTTP/2 messages may be big, so putting an HTTP/2 frame on a wire is not atomic in general case, so multiple threads may be writing concurrently to the same network socket. Furthermore, there's flow control related buffering: if the other side of the stream is not ready to accept more messages, they need to be buffered on your side, so multiple threads that may be writing concurrently to the same buffer.
  2. "request is buffered in the gRPC client" is the correct answer: StreamObserver's methods are asynchronous, they return very quickly and the actual network communication will be performed by another thread at some point.
  3. when sending messages, you should respect other side's readiness using methods from CallStreamObserver: isReady() and setOnReadyHandler(...) (you can always cast your outbound StreamObserver to a CallStreamObserver). If you ignore other side's readiness, gRPC will buffer messages in its internal buffer until the other side is ready. This may lead to excessive memory usage in some cases, though, due to the buffering described in point 1.
    BTW: you may want to look at the official copyWithFlowControl() helper methods and my own DispatchingOnReadyHandler class.
  4. I guess your intention is to always have an RPC open: if so, your code seems fine. However the question is whether you should be using a single bi-di call vs. multiple unary: If server processing of 1 request message is not tightly related to the processing of others (ie: server does not need to constantly maintain a single, in-memory state related to all request messages), then unary calls will be better for at least 2 reasons:
    4.1. no synchronization as described in point 1 will be necessary.
    4.2. you will take better use of server load-balancing.
    Under normal circumstances an overhead for starting a new unary RPC is small as it will open a new stream on an existing HTTP/2 connection.
    If however server-side processing of some request message may be related to some other previous request messages, then indeed you do need client side streaming. You should however try to close and renew the RPC whenever possible to allow servers to balance the traffic.
  5. If your client runs as a part of some server app, then grpclb policy is the most common load-balancing choice: it will maintain a collection of multiple HTTP/2 connections to several available backends, each connection possibly having multiple HTTP/2 streams (HTTP/2 streams correspond 1-1 with gRPC Streams). Furthermore, grpclb will actively probe these connections to verify if they are healthy and also automatically reissue the DNS query (or whatever other name resolution service, if you use custom NameResolver) to see if any new backends were added when needed. Remember to include grpc-grpclb runtime dependency if you want to use it. See note at the bottom of the answer for more info on loadbalancing and name resolution in server app to server app case.
    If your client runs on android, then grpclb is not available normally (most android devices lack capabilities to run a background load balancer and even if possible it would drain device's battery quite fast), but your connections will usually go through some load-balancer standing in front of the backend servers. In such case each new RPC will usually go to the least occupied backend.
    However since you seem to be maintaining just 1 long-living call, all your request messages will be going to the same backend until the call is "renewed": that's why I suggested using unary calls if possible in the previous point. This is waaay simpler than implementing your own load-balancing, so if only possible it should be preferred.

A clarification regarding "things I understand" part: "sub-channels" are most usually basically HTTP/2 connections: a channel is a collection of possibly multiple HTTP/2 connections (depending on client capabilities: see point 5) to multiple backends (depending on the server configuration of course) and each connections can have multiple independent streams (1 HTTP/2 stream per 1 gRPC call).

Few notes on load balancing and name resolution in case of a server app to server app gRPCs:

  • the simplest and most common way for the client side to find available backends is via a DNS resolution. More complex mechanisms include things like xDS or Consul or custom NameResolvers.
  • In particular, when deploying both backend and client server apps in the same k8s cluster, the most common way is to deploy the backends as a headless service, so that all backend pods can be obtained by the clients via an in-cluster DNS query about <backed-service-name>.<k8s-namespace>.svc.cluster.local. For example .forTarget("myGrpcBackendService.default.svc.cluster.local:6000").
  • By default, clients using grpclb will try to keep their set of connections to backends as long as possible, so even if RPCs are short-living, the underlying connections will stick to the same set of backends. To enforce periodical rebalancing, servers may set maximum connection lifetime using methods from NettyServerBuilder: maxConnectionAge(...), maxConnectionAgeGrace(...) and maxConnectionIdle(...).

Upvotes: 4

Diarmuid Leonard
Diarmuid Leonard

Reputation: 123

It turns out that all the answers are available in the HTTP/2 spec https://datatracker.ietf.org/doc/html/rfc7540# which is very readable.

  1. Answered by Eric Anderson "Note that the object is not thread-safe, so if you call methods on it from multiple threads simultaneously you should do your own locking/synchronization". Doesn't really explain why, I guess that the Unary API did not need to be synchronized so they didn't synchronize for the Streaming API
  2. The application request is at least in a buffer ready to put on the wire in a HTTP DATA frame. The thread is not blocked waiting for any confirmation that it has been received. DATA frames are not ACK'd anyway. The application may be prevented from sending the data due to HTTP2 Flow Control to prevent overloading the far end but it seems unlikely that even in this case, the thread would be blocked.
  3. As per above, Flow Control is described at https://datatracker.ietf.org/doc/html/rfc7540#section-6.9. The server grants permission to the client to send a number of bytes and the client must stop when the allocation has been used.
  4. No, not unusual. Here, Eric Anderson says "for fully-asynchronous processing we would expect you to start any I/O on the current thread and then call methods on StreamObserver from some other thread once the I/O completes.". Duplicate of Java gRPC server for long-lived streams effective implementation
  5. As per Eric Anderson's response above, I think it is normal to cache the stream reference and re-use it. However, I don't know why the Java channel API does not include method int getStreamLimit() because the client needs to know many streams it is allowed to use as per the SETTINGS_MAX_CONCURRENT_STREAMS value in the HTTP2 SETTINGS message exchanged during creating of the channel.

General comment: I found the gRPC documentation difficult to understand until I realized that most of the deeper questions can be answered by studying HTTP2 e.g.
a HTTP2 Request does not correspond 1:1 with the application request object in the proto IDL. Keypoint: HTTP2 Request continues for the duration of the stream and can carry many application request objects.

Upvotes: 0

Related Questions