Reputation: 7581
I want to make a thread wait until a multithread queue is not empty. The queue has only one producer and one consumer. The producer places tasks in the queue when available but the producer has to wait until two or more tasks have been gathered. The reason why I don't just use the get
method twice in order to retrieve two tasks is because it over complicates the flow of the algorithm. That cannot be depicted in the snippet bellow though, because obviously it's just an oversimplified example.
I need to know that the queue is not empty so that I can compare the peak of the queue (without removing it) with the element I just removed with get
How it could be done with sleep:
while myQueue.empty():
sleep(0.05)
How can I do that without using sleep? Should I use event.wait()
? If yes, I cannot figure out how I should properly use the event.clear()
command. Since the thread that I want to make wait is also the consumer and I cannot be sure whether the queue is empty. Even if I use queue.empty()
to check.
Upvotes: 6
Views: 8865
Reputation: 2274
Essentially, it seems you need to implement the Queue.peek()
method, that would return the next element in the queue without actually removing it.
This method is not available in the standard Queue object, but you can inherit and expand it without problems:
from Queue import Queue
class VoyeurQueue(Queue):
def peek(self, block=True, timeout=None):
# ...
Now for the contents of new peek()
method, you can simply copy-paste the contents of get()
method of the base Queue
object with some modifications. You can find it at /usr/lib/python?.?/Queue.py
if you're on Linux, or %PYTHONPATH%/lib/Queue.py
if you're on Windows (not sure about the latter as I'm currently on Linux machine and cannot check). In my copy of Python 2.7, the get()
method is implemented as:
def get(self, block=True, timeout=None):
# ... lots of comments
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
def _get(self):
return self.queue.popleft()
Now, for differences. You don't want to remove the element, so instead of _get()
we define the following:
def _peek(self):
return self.queue[0]
And in the peek()
method, we still use the self.not_empty
Condition but we no longer need the self.not_full.notify()
. So the resulting code will look like:
from Queue import Queue
class VoyeurQueue(Queue):
def peek(self, block=True, timeout=None):
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._peek()
return item
finally:
self.not_empty.release()
def _peek(self):
return self.queue[0]
Upvotes: 8
Reputation: 1070
You can use a semaphore, initialized at zero, in parallel to the queue. Let say for example mySemaphore = threading.Semaphore(0)
. By default the thread calling mySempahore.acquire()
will be blocked as the semaphore is zero without touching the queue. Then when you put someting in the queue, you can call mySemaphore.release()
that will allow one thread to execute (util the next loop is suppose).
Upvotes: 4
Reputation: 394965
I want to make a thread wait until a multithread queue is not empty.
I want to avoid retrieving the next object, that's why I am not using the get method
If you don't mind using a sentinel object (I use one I name Done
to tell my consumer thread we're done so it can wrap up.)
Start = object() # sentinel object on global scope.
in producer:
queue.put(Start)
and in worker:
item = queue.get() # blocks until something received
if item is Start:
print('we have now started!')
I'm not sure why you'd do that though, but this does seem to do what you want it to do.
Upvotes: 0
Reputation: 1070
Just myQueue.get(block=True)
will bock your thread (stop its execution) until there is something to retrieve from the queue. When an item is availabe in the queue it will be returned by this call. You can add a timeout in case you want to exit if the queue is never feed.
See https://docs.python.org/3/library/queue.html#queue.Queue.get.
Upvotes: 2