Reputation: 45078
I'm trying to set up a simple producer-consumer system in Gevent but my script doesn't exit:
import gevent
from gevent.queue import *
import time
import random
q = Queue()
workers = []
def do_work(wid, value):
"""
Actual blocking function
"""
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
return
def worker(wid):
"""
Consumer
"""
while True:
item = q.get()
do_work(wid, item)
def producer():
"""
Producer
"""
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
for item in range(1, 9):
q.put(item)
producer()
gevent.joinall(workers)
I haven't been able to find good examples/tutorials on using Gevent so what I've pasted above is what I've cobbled up from the internet.
Multiple workers get activated, the items go into the queue but even when everything in the queue finishes, the main program doesn't exit. I have to press CTRL ^ C
.
What am I doing wrong?
Thanks.
On a side note: if there is anything my script that could be improved, please let me know. Simple things like checking when the Queue is empty, etc.
Upvotes: 6
Views: 4803
Reputation: 5420
In your worker, you activate a loop that will run forever.
As a side note, an imho more elegant "forever loop" can be written with just:
for work_unit in q:
# Do work, etc
gevent.joinall() waits for the workers to finish; but they never do, so your program will forever be waiting. This is what causes it to not exit.
If you don't care about the workers anymore, you can just kill them instead:
gevent.killall(workers)
An alternative is to put a 'special' item in the queue. When a worker receives this item, it recognises it as different from normal work and stops working.
for worker in workers:
q.put("TimeToDie")
for work_unit in q:
if work_unint == "TimeToDie":
break
do_work()
Or you could even use gevent's Event to do this kind of pattern.
Upvotes: 2
Reputation: 9522
I think you should use JoinableQueue
like in example from documentation.
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
do_work(wid, item)
finally:
q.task_done()
def producer():
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
for item in range(1, 9):
q.put(item)
producer()
q.join()
Upvotes: 5