Talgat
Talgat

Reputation: 59

How to receive messages from a streaming server?

I am trying to build an asynchronous gRPC C++ client that sends/receives streaming messages to/from server using the ClientAsyncReaderWriter instance. The client and the server send messages to each other whenever they want. How can I check if there is any message from the server?

The ClientAsyncReaderWriter instance has a binded completion queue. I tried to check the completion queue by calling Next() and AsyncNext() functions to see if there is any event that would indicate that there is some message from the server. However, the completion queue has no events even if there is a message from the server.

class AsyncClient {
public:
    AsyncClient(std::shared_ptr<grpc::Channel> channel) :
        stub_(MyService::NewStub(channel)),
        stream(stub_->AsyncStreamingRPC(&context, &cq, (void *)1))
    {}
    ~AsyncClient()
    {}

    void receiveFromServer() {
        StreamingResponse response;

        // 1. Check if there is any message
        // 2. Read the message
    }

private:
    grpc::ClientContext context;
    grpc::CompletionQueue cq;
    std::shared_ptr<MyService::Stub> stub_;
    std::shared_ptr<grpc::ClientAsyncReaderWriter<StreamingRequest, StreamingResponse>> stream;
};

I need to implement steps 1 and 2 in the receiveFromServer() function.

Upvotes: 1

Views: 1051

Answers (2)

Johan Chane
Johan Chane

Reputation: 118

See route_guide and its route_guide.proto:

service RouteGuide {
  // A simple RPC.
  //
  // Obtains the feature at a given position.
  //
  // A feature with an empty name is returned if there's no feature at the given
  // position.
  rpc GetFeature(Point) returns (Feature) {}

  // A server-to-client streaming RPC.
  //
  // Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

Upvotes: 0

Talgat
Talgat

Reputation: 59

Fortunately, I found a solution to my problem and the asynchronous bi-directional streaming in my project works as expected now. It turned out that my understanding of the "completion queue" concept was incorrect. This example was a great help for me!

Upvotes: 3

Related Questions