Reputation: 45078
I have some producer function which rely on I/O heavy blocking calls and some consumer functions which too rely on I/O heavy blocking calls. In order to speed them up, I used the Gevent micro-threading library as glue.
Here's what my paradigm looks like:
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
#This doesnt work.
for j in range(2):
producers.append(gevent.spawn(producer))
#Uncommenting this makes this script work.
#producer()
q.join()
I have four consumer and would like to have two producers. The producers exit when they a signal i.e. 10. The consumers keep feeding off this queue and the whole task finishes when the producers and consumers are over.
However, this doesn't work. If I comment out the for
loop which spawns multiple producers and use only a single producer, the script runs fine.
I can't seem to figure out what I've done wrong.
Any ideas?
Thanks
Upvotes: 5
Views: 2306
Reputation: 1132
I have met same issues like yours. The main problem with your code was that your producer has been spawned in gevent thread which make worker couldn't get task immediately.
I suggest that you should run producer()
in the main process not spawn in gevent thread When the process run met the producer which could push the task immediately.
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
producer()
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
Codes above make sense.. :)
Upvotes: 0
Reputation: 8419
What you want do to is to block the main program while the producers and workers communicate. Blocking on the queue will wait until the queue is empty and then yield, which could be immediately. Put this at the end of your program instead of q.join()
gevent.joinall(producers)
Upvotes: 0
Reputation: 5420
You don't actually want to quit when the queue has no unfinished work, because conceptually that's not when the application should finish.
You want to quit when the producers have finished, and then when there is no unfinished work.
# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.
Upvotes: 6
Reputation: 15278
I think it does q.join()
before anything is put in the queue and exits immediately. Try joining all producers before joining queue.
Upvotes: 3