Reputation: 130
Let's say I am building a simple chat app using gRPC with the following .proto
:
service Chat {
rpc SendChat(Message) returns (Void);
rpc SubscribeToChats(Void) returns (stream Message);
}
message Void {}
message Message {string text = 1;}
The way I often see the servicer implemented (in examples) in Python is like this:
class Servicer(ChatServicer):
def __init__(self):
self.messages = []
def SendChat(self, request, context):
self.messages.append(request.text)
return Void()
def SubscribeToChats(self, request, context):
while True:
if len(self.messages) > 0:
yield Message(text=self.messages.pop())
While this works, it seems very inefficient to spawn an infinite loop that continuously checks a condition for each connected client. It would be preferable to instead have something like this, where the send is triggered right as a message comes in and doesn't require any constant polling on a condition:
class Servicer(ChatServicer):
def __init__(self):
self.listeners = []
def SendChat(self, request, context):
for listener in self.listeners:
listener(Message(request.text))
return Void()
def SubscribeToChats(self, request, context, callback):
self.listeners.append(callback)
However, I can't seem to find a way to do something like this using gRPC.
I have the following questions:
Thanks in advance!
Upvotes: 0
Views: 1614
Reputation: 130
I figured out an efficient way to do it. The key is to use the AsyncIO API. Now my SubscribeToChats
function can be an async generator, which makes things much easier.
Now I can use something like an asyncio Queue, which my function can await on in a while loop. Similar to this:
class Servicer(ChatServicer):
def __init__(self):
self.queue = asyncio.Queue()
async def SendChat(self, request, context):
await self.queue.put(request.text)
return Void()
async def SubscribeToChats(self, request, context):
while True:
yield Message(text=await self.queue.get())
Upvotes: 0