Reputation: 14022
I have a program using python's packages multiprocessing and Queue. One of my functions have this structure:
from multiprocessing import Process, Queue
def foo(queue):
while True:
try:
a = queue.get(block = False)
doAndPrintStuff(a)
except:
print "the end"
break
if __name__ == "__main__"
nthreads = 4
queue = Queue.Queue()
# put stuff in the queue here
for stuff in moreStuff:
queue.put(stuff)
procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)]
for p in procs:
p.start()
for p in procs:
p.join()
the idea is that when I try to extract from the queue and it is empty, it'll raise an exception and terminate the loop. So I have two questions:
1) is this a safe idiom? Are there better ways to do this?
2) I tried to find what is the exact exception that is raised when I try to .get()
from an empty queue. Currently my program is catching all exceptions, which sucks when the error is somewhere else and I only get a "the end" message.
I tried:
import Queue
queue = Queue.Queue()
[queue.put(x) for x in xrange(10)]
try:
print queue.get(block = False)
except Queue.Empty:
print "end"
break
but I got the error as if I hadn't caught the exception. What's the correct exception to catch?
Upvotes: 8
Views: 38098
Reputation: 1060
Here's an example- As @Steven said above, you need to use the queue.Empty exception from the standard queue. The note from documentation (https://docs.python.org/3/library/multiprocessing.html):
Note
multiprocessing uses the usual queue.Empty and queue.Full exceptions to signal a timeout. They are not available in the multiprocessing namespace so you need to import them from queue.
basic example:
from multiprocessing import Process, Queue, Manager
import queue
def firstPass(q):
driver = getDriver()
while True:
try:
link = q.get_nowait()
f(driver, link)
except queue.Empty:
logger.info("empty queue")
driver.close()
break
Upvotes: 1
Reputation: 19644
It appears that the Queue is empty until the put buffers are flushed, which may take a while.
The solution to our problem is to use sentinels, or maybe the built-in task_done() call:
task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
Upvotes: 2
Reputation: 28666
The exception should be Queue.Empty
. But are you sure you got the same error? In your second example, you also switched the queue itself from multiprocessing.Queue
to Queue.Queue
, which I think may be the problem.
It might seem strange, but you have to use the multiprocessing.Queue
class, but use the Queue.Empty
exception (which you have to import yourself from the Queue
module)
Upvotes: 20
Reputation: 5742
Try reading the queue library docs. Aren't you looking for Queue.empty()?
Upvotes: -4