Reputation: 2222
I have this snippet that uses the Queue
class from the multiprocess
module. I am very confused that the .empty()
method of an instance of Queue
does not give me a correct value as i would expect. This is my code:
from time import sleep
from multiprocessing import Queue, Lock
foo = Queue()
locker = Lock()
with locker: # even with this, still True
foo.put("bar")
print(foo.empty()) # True, obviously not
print(foo.empty()) # True
print(foo.empty()) # True
print(foo.qsize()) # 1L
print(foo.empty()) # True
However, if i use the sleep
function from time
, as in cause a chronological delay in the execution. It works.
from time import sleep
from multiprocessing import Queue, Lock
foo = Queue()
locker = Lock()
foo.put("bar")
sleep(0.01)
print(foo.empty()) # False
print(foo.empty()) # False
print(foo.empty()) # False
print(foo.qsize()) # 1L
print(foo.empty()) # False
I know my alternative is the .qsize() > 0
expression, but i am sure that i just doing this in a wrong way.
What am i doing wrong?
*EDIT*
I understand now that is it unreliable, thank you @Mathias Ettinger. Any clean alternatives? I need to know hot to reliably tell if my Queue
is empty or not.
Upvotes: 9
Views: 7415
Reputation: 58534
Unfortunately, the Queue's complex implementation that means that .empty()
and .qsize()
check different things to make their judgments. That means that they may disagree for a while, as you've seen.
Since .qsize()
is supported on your platform (which is not true everywhere), you can re-implement the .empty()
check in terms of .qsize()
, and this will work for you:
# mp.Queue() is a function, not a class, so we need to find the true class
# to subclass
import multiprocessing.queues
class XQueue(multiprocessing.queues.Queue):
def empty(self):
try:
return self.qsize() == 0
except NotImplementedError: # OS X -- see qsize() implementation
return super(XQueue, self).empty()
Under the hood, the Queue .put()
is a complex process: the Queue places objects in a buffer and acquires an interprocess semaphore, while a hidden daemon thread is responsible for draining the buffer and serializing its contents to a pipe. (Consumers then .get()
by reading from this pipe and releasing the interprocess semaphore.) So, that's why sleeping in your example works: the daemon thread has enough time to move the object from in-memory buffer to I/O representation before you call .empty()
.
As an aside, I find this behavior surprising: a Queue in the very same internal state can give two different answers to the question, "do you have any elements enqueued?" (qsize
will say "yes", and empty
"no".)
I think I understand how this came about. Since not all platforms support sem_getvalue()
, not all platforms can implement qsize
, but empty
can be reasonably implemented by just polling the FIFO. I'd have expected empty
to be implemented in terms of qsize
on platforms that support the latter.
Upvotes: 10
Reputation: 4186
As per the documentation, neither empty()
, full()
, nor qsize()
are reliable.
Alternatives includes:
Reading the exact amount of items going through the Queue
:
AMT = 8
for _ in range(AMT):
queue.put('some stuff')
for _ in range(AMT):
print(queue.get())
This is useful if you know beforehand how many items must be processed in total or how many will be processed by each thread.
Reading items until a guardian appears:
num_threads = 8
guardian = 'STUFF DONE'
while num_threads:
item = queue.get()
if item == guardian:
num_threads -= 1
else:
process(item)
This is helpful if every thread have a variable amount of work (and you don't know the total beforehand) to do but can determine when it’s done.
Upvotes: 7