Pavel Zagalsky
Pavel Zagalsky

Reputation: 1636

Failure to implement a Client Streaming to Server Python code

I just started with gRPC and it somewhat confuses me. I am trying to make a Client that streams to a Server and a Server that streams to the Client. I had success implementing Server streaming to Client but failing to do a Client streaming to the Server.

Here's the .proto file I am using:

syntax = "proto3";

package streaming;

service Streaming{
  rpc ServerStreaming (Message) returns (stream Message) {}
  rpc ClientStreaming (stream Message) returns (Message) {}
}

message Message{
  string message = 1;
}

message MessageResponse{
  string message = 1;
  bool received = 2;
}

Here's my server code:

from concurrent import futures

import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2


def make_message(message):
    return streaming_pb2.Message(
        message=message
    )


class StreamingService(streaming_pb2_grpc.StreamingServicer):

    def ServerStreaming(self, request_iterator, context):
        message_full = ''
        for message in request_iterator:
            message_full += message
        result = f'Hello I am up and running received "{message_full}" message from you'
        result = {'message': result, 'received': True}
        return streaming_pb2.MessageResponse(**result)


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    streaming_pb2_grpc.add_StreamingServicer_to_server(StreamingService(), server)
    server.add_insecure_port('[::]:8091')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    serve()

And here's the client:


import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2


def make_message(message):
    return streaming_pb2.Message(
        message=message
    )


def generate_messages():
    messages = [
        make_message("First message"),
        make_message("Second message"),
        make_message("Third message"),
        make_message("Fourth message"),
        make_message("Fifth message"),
    ]
    for msg in messages:
        print("Hello Server Sending you the %s" % msg.message)
        yield msg


def send_message(stub):
    responses = stub.ServerStreaming(generate_messages())
    for response in responses:
        print("Hello from the server received your %s" % response.message)


def run():
    with grpc.insecure_channel('localhost:8091') as channel:
        stub = streaming_pb2_grpc.StreamingStub(channel)
        send_message(stub)


if __name__ == '__main__':
    run()

This is the error I am getting when running the client:

/Users/<>/bin/python3 /Users/<>/client_streaming_client.py
Traceback (most recent call last):
  File "/Users/<>/client_streaming_client.py", line 40, in <module>
    run()
  File "/Users/<>/client_streaming_client.py", line 36, in run
    send_message(stub)
  File "/Users/<>/client_streaming_client.py", line 28, in send_message
    responses = stub.ServerStreaming(generate_messages())
  File "/Users/<>/lib/python3.7/site-packages/grpc/_channel.py", line 1057, in __call__
    raise rendezvous  # pylint: disable-msg=raising-bad-type
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INTERNAL
    details = "Exception serializing request!"
    debug_error_string = "None"
>

Process finished with exit code 1

Any idea what I am doing wrong here? I am clueless..

Upvotes: 0

Views: 1386

Answers (1)

Alex
Alex

Reputation: 7045

It's not entirely clear from your post what you are expecting to happen. You have defined two RPCs ServerStreaming (a response-streaming RPC) and ClientStreaming (a request-streaming RPC). Neither of your protos use the MessageResponse type (but your server attempts to return it).

Based on the client and server code that you have written, I think you are expecting a bidirectionally-streaming RPC, defined like so:

streaming.proto

syntax = "proto3";

package streaming;

service Streaming {
  rpc ClientStreaming(stream Message) returns (stream MessageResponse) {}
}

message Message{
  string message = 1;
}

message MessageResponse{
  string message = 1;
  bool received = 2;
}

Then you can iterate requests and yield responses from the server:

from concurrent import futures
import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2

class StreamingService(streaming_pb2_grpc.StreamingServicer):
    def ClientStreaming(self, request_iterator, context):
        s = ""
        for message in request_iterator:
            print(f"Received: {message.message!r}")
            s = ", ".join([s, message.message]) if s else message.message
            yield streaming_pb2.MessageResponse(message=s, received=True)

if __name__ == "__main__":
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    streaming_pb2_grpc.add_StreamingServicer_to_server(StreamingService(), server)
    server.add_insecure_port("[::]:8091")
    server.start()
    server.wait_for_termination()

And you can send messages using an iterable and iterate response with the client:

import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2


def generate_messages():
    messages = [
        "First message",
        "Second message",
        "Third message",
        "Fourth message",
        "Fifth message",
    ]
    for msg in messages:
        msg = streaming_pb2.Message(message=msg)
        print(f"Sending {msg.message!r}")
        yield msg


def send_message(stub):
    responses = stub.ClientStreaming(generate_messages())
    for response in responses:
        print(f"Received {response.message!r}")


if __name__ == "__main__":
    with grpc.insecure_channel("localhost:8091") as channel:
        stub = streaming_pb2_grpc.StreamingStub(channel)
        send_message(stub)

Upvotes: 2

Related Questions