Reputation: 960
What I'm trying to do: When I make an update to the state of an object, all gRPC clients should be given the update via a gRPC stream. It's important that each client gets every update, and that they get it exactly once.
What I expect to happen: When I do event.set() and then event.clear() immediately after, all of the clients will run one time, yielding the new status.
What actually happens:the clients are missing updates. For example, my serve function I'm sending out 10 updates to the version. On the client side it's missing these updates, I'll see where it has update 1 2 then misses 3 or some other update, then starts getting them again.
Server version 1, this doesn't work because clients are missing some updates:
class StatusStreamer(pb2_grpc.StatusServiceServicer):
def __init__(self, status, event):
self.continue_running = True
self.status = status
self.event = event
def StatusSubscribe(self, request, context):
while self.continue_running:
self.event.wait()
yield self.status
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
status = status_builder()
event = threading.Event()
status_streamer = StatusStreamer(status, event)
pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
server.add_insecure_port('[::]:50051')
server.start()
print('server started')
try:
while True:
_ = input('enter a key to update')
for _ in range(10):
#make an update and send it out to all clients
status.version = str(int(status.version) + 1)
print('update:',status.version)
event.set()
event.clear()
except KeyboardInterrupt:
print('\nstopping...')
event.set()
status_streamer.continue_running = False
server.stop(0)
Server version 2, this one works but I think there's a race condition: In this second version instead of using a threading.Event I use a boolean, new_update which is shared among all of the threads. Inside the serve function I set it to true and then all of the threads set it to False.
class StatusStreamer(pb2_grpc.StatusServiceServicer):
def __init__(self, status):
self.continue_running = True
self.new_update = False
self.status = status
def StatusSubscribe(self, request, context):
while self.continue_running:
if self.new_update:
yield self.status
self.new_update = False #race condition I believe, that maybe doesn't occur because of the GIL.
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
status = status_builder()
status_streamer = StatusStreamer(status)
pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
server.add_insecure_port('[::]:50051')
server.start()
print('server started')
try:
while True:
_ = input('enter a key to update')
for _ in range(10):
#make an update and send it out to all clients
status.version = str(int(status.version) + 1)
print('update:', status.version)
status_streamer.new_update = True #Also a race condition I believe.
except KeyboardInterrupt:
print('\nstopping...')
status_streamer.continue_running = False
server.stop(0)
I believe the second version only works because it relies on CPython's global interpreter lock, ensuring that no thread will be mutating new_update at the same time. I do not like this solution, what are my options? Also, I'm aware that I can create a queue or list and store all of the changes and then keep track of where each connected client is at, I do not want to allocate the memory to do that.
Upvotes: 0
Views: 1049
Reputation: 2091
For server version 1, the reason of missing update is that once the main thread held the GIL, it may execute multiple event.set()
before yield GIL to other threads. So other thread may not blocked by event.wait()
, and results in missing updates. An potential fix will be keeping a counter of connection, and blocking the update of version until the server has sent update to all connections.
For server version 2, use a threading.Lock
or threading.RLock
may solve your race condition. Also, this version will consume a lot of CPU cycles in the flag checking, may impair your business logic in other threads. And it is also possible that the main thread is holding GIL too long that server is yet to sent messages to all connections.
Unfortunately, I don't have a perfect solution to satisfy your requirement. The gRPC team has a servicer implementation with similar functionality at https://github.com/grpc/grpc/blob/v1.18.x/src/python/grpcio_health_checking/grpc_health/v1/health.py.
In the implementation, the servicer will keep the reference of the returned response iterators. When the status is updated, the servicer will explicitly add message to corresponding response iterators. Hence, the status update will not miss.
Hope this can answer your question.
Upvotes: 1