Reputation: 1636
I just started with gRPC and it somewhat confuses me. I am trying to make a Client that streams to a Server and a Server that streams to the Client. I had success implementing Server streaming to Client but failing to do a Client streaming to the Server.
Here's the .proto file I am using:
syntax = "proto3";
package streaming;
service Streaming{
rpc ServerStreaming (Message) returns (stream Message) {}
rpc ClientStreaming (stream Message) returns (Message) {}
}
message Message{
string message = 1;
}
message MessageResponse{
string message = 1;
bool received = 2;
}
Here's my server code:
from concurrent import futures
import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2
def make_message(message):
return streaming_pb2.Message(
message=message
)
class StreamingService(streaming_pb2_grpc.StreamingServicer):
def ServerStreaming(self, request_iterator, context):
message_full = ''
for message in request_iterator:
message_full += message
result = f'Hello I am up and running received "{message_full}" message from you'
result = {'message': result, 'received': True}
return streaming_pb2.MessageResponse(**result)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
streaming_pb2_grpc.add_StreamingServicer_to_server(StreamingService(), server)
server.add_insecure_port('[::]:8091')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
And here's the client:
import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2
def make_message(message):
return streaming_pb2.Message(
message=message
)
def generate_messages():
messages = [
make_message("First message"),
make_message("Second message"),
make_message("Third message"),
make_message("Fourth message"),
make_message("Fifth message"),
]
for msg in messages:
print("Hello Server Sending you the %s" % msg.message)
yield msg
def send_message(stub):
responses = stub.ServerStreaming(generate_messages())
for response in responses:
print("Hello from the server received your %s" % response.message)
def run():
with grpc.insecure_channel('localhost:8091') as channel:
stub = streaming_pb2_grpc.StreamingStub(channel)
send_message(stub)
if __name__ == '__main__':
run()
This is the error I am getting when running the client:
/Users/<>/bin/python3 /Users/<>/client_streaming_client.py
Traceback (most recent call last):
File "/Users/<>/client_streaming_client.py", line 40, in <module>
run()
File "/Users/<>/client_streaming_client.py", line 36, in run
send_message(stub)
File "/Users/<>/client_streaming_client.py", line 28, in send_message
responses = stub.ServerStreaming(generate_messages())
File "/Users/<>/lib/python3.7/site-packages/grpc/_channel.py", line 1057, in __call__
raise rendezvous # pylint: disable-msg=raising-bad-type
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "Exception serializing request!"
debug_error_string = "None"
>
Process finished with exit code 1
Any idea what I am doing wrong here? I am clueless..
Upvotes: 0
Views: 1386
Reputation: 7045
It's not entirely clear from your post what you are expecting to happen. You have defined two RPCs ServerStreaming
(a response-streaming RPC) and ClientStreaming
(a request-streaming RPC). Neither of your protos use the MessageResponse
type (but your server attempts to return it).
Based on the client and server code that you have written, I think you are expecting a bidirectionally-streaming RPC, defined like so:
streaming.proto
syntax = "proto3";
package streaming;
service Streaming {
rpc ClientStreaming(stream Message) returns (stream MessageResponse) {}
}
message Message{
string message = 1;
}
message MessageResponse{
string message = 1;
bool received = 2;
}
Then you can iterate requests and yield responses from the server:
from concurrent import futures
import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2
class StreamingService(streaming_pb2_grpc.StreamingServicer):
def ClientStreaming(self, request_iterator, context):
s = ""
for message in request_iterator:
print(f"Received: {message.message!r}")
s = ", ".join([s, message.message]) if s else message.message
yield streaming_pb2.MessageResponse(message=s, received=True)
if __name__ == "__main__":
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
streaming_pb2_grpc.add_StreamingServicer_to_server(StreamingService(), server)
server.add_insecure_port("[::]:8091")
server.start()
server.wait_for_termination()
And you can send messages using an iterable and iterate response with the client:
import grpc
import streaming_pb2_grpc as streaming_pb2_grpc
import streaming_pb2 as streaming_pb2
def generate_messages():
messages = [
"First message",
"Second message",
"Third message",
"Fourth message",
"Fifth message",
]
for msg in messages:
msg = streaming_pb2.Message(message=msg)
print(f"Sending {msg.message!r}")
yield msg
def send_message(stub):
responses = stub.ClientStreaming(generate_messages())
for response in responses:
print(f"Received {response.message!r}")
if __name__ == "__main__":
with grpc.insecure_channel("localhost:8091") as channel:
stub = streaming_pb2_grpc.StreamingStub(channel)
send_message(stub)
Upvotes: 2