Reputation: 141
I want to use python gevent library to implement one producer and multiple consumers server. There is my attempt:
class EmailValidationServer():
def __init__(self):
self.queue = Queue()
def worker(self):
while True:
json = self.queue.get()
def handler(self,socket,address):
fileobj = socket.makefile()
content = fileobj.read(max_read)
contents = json.loads(content)
for content in contents:
self.queue.put(content)
def daemon(self,addr='127.0.0.1',num_thread=5):
pool = Pool(1000)
server = StreamServer((addr, 6000),self.handler,spawn=pool) # run
pool = ThreadPool(num_thread)
for _ in range(num_thread):
pool.spawn(self.worker)
server.serve_forever()
if __name__ == "__main__":
email_server = EmailValidationServer()
email_server.daemon()
I used the queue from gevent.queue.Queue. It gives me the error information:
LoopExit: This operation would block forever
(<ThreadPool at 0x7f08c80eef50 0/4/5>,
<bound method EmailValidationServer.worker of <__main__.EmailValidationServer instance at 0x7f08c8dcd998>>) failed with LoopExit
Problem: But when I change the Queue from gevent's implementation to python build-in library, it works. I do not know the reason, I guess it's supported to have difference between their implementation. I do not know the reason why gevent does not allow infinite wait. Is there anyone can give an explanation? Thanks advance
Upvotes: 0
Views: 228
Reputation: 1132
I suggest that you could use the gevent.queue.JoinableQueue()
instead of Python's built-in Queue()
. You can refer to the official queue guide for API Usages (http://www.gevent.org/gevent.queue.html)
def worker():
while True:
item = q.get()
try:
do_work(item)
finally:
q.task_done()
q = JoinableQueue()
for i in range(num_worker_threads):
gevent.spawn(worker)
for item in source():
q.put(item)
q.join() # block until all tasks are done
If you met the exceptions again, you'd better get fully understand the principle of Gevent corouinte control flow ...Once you get the point, that was not a big deal. :)
Upvotes: 2