LetsPlayYahtzee
LetsPlayYahtzee

Reputation: 7581

How to wait until a multithread queue is not empty without wasting too much cpu cycles

I want to make a thread wait until a multithread queue is not empty. The queue has only one producer and one consumer. The producer places tasks in the queue when available but the producer has to wait until two or more tasks have been gathered. The reason why I don't just use the get method twice in order to retrieve two tasks is because it over complicates the flow of the algorithm. That cannot be depicted in the snippet bellow though, because obviously it's just an oversimplified example.

I need to know that the queue is not empty so that I can compare the peak of the queue (without removing it) with the element I just removed with get

How it could be done with sleep:

while myQueue.empty():
    sleep(0.05)

How can I do that without using sleep? Should I use event.wait()? If yes, I cannot figure out how I should properly use the event.clear() command. Since the thread that I want to make wait is also the consumer and I cannot be sure whether the queue is empty. Even if I use queue.empty() to check.

Upvotes: 6

Views: 8865

Answers (4)

Lav
Lav

Reputation: 2274

Essentially, it seems you need to implement the Queue.peek() method, that would return the next element in the queue without actually removing it.

This method is not available in the standard Queue object, but you can inherit and expand it without problems:

from Queue import Queue
class VoyeurQueue(Queue):
    def peek(self, block=True, timeout=None):
        # ...

Now for the contents of new peek() method, you can simply copy-paste the contents of get() method of the base Queue object with some modifications. You can find it at /usr/lib/python?.?/Queue.py if you're on Linux, or %PYTHONPATH%/lib/Queue.py if you're on Windows (not sure about the latter as I'm currently on Linux machine and cannot check). In my copy of Python 2.7, the get() method is implemented as:

def get(self, block=True, timeout=None):
    # ... lots of comments
    self.not_empty.acquire()
    try:
        if not block:
            if not self._qsize():
                raise Empty
        elif timeout is None:
            while not self._qsize():
                self.not_empty.wait()
        elif timeout < 0:
            raise ValueError("'timeout' must be a non-negative number")
        else:
            endtime = _time() + timeout
            while not self._qsize():
                remaining = endtime - _time()
                if remaining <= 0.0:
                    raise Empty
                self.not_empty.wait(remaining)
        item = self._get()
        self.not_full.notify()
        return item
    finally:
        self.not_empty.release()

def _get(self):
    return self.queue.popleft()

Now, for differences. You don't want to remove the element, so instead of _get() we define the following:

def _peek(self):
    return self.queue[0]

And in the peek() method, we still use the self.not_empty Condition but we no longer need the self.not_full.notify(). So the resulting code will look like:

from Queue import Queue

class VoyeurQueue(Queue):

    def peek(self, block=True, timeout=None):
        self.not_empty.acquire()
        try:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = _time() + timeout
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._peek()
            return item
        finally:
            self.not_empty.release()

    def _peek(self):
        return self.queue[0]

Upvotes: 8

Antoine
Antoine

Reputation: 1070

You can use a semaphore, initialized at zero, in parallel to the queue. Let say for example mySemaphore = threading.Semaphore(0). By default the thread calling mySempahore.acquire() will be blocked as the semaphore is zero without touching the queue. Then when you put someting in the queue, you can call mySemaphore.release() that will allow one thread to execute (util the next loop is suppose).

Upvotes: 4

Aaron Hall
Aaron Hall

Reputation: 394965

I want to make a thread wait until a multithread queue is not empty.

I want to avoid retrieving the next object, that's why I am not using the get method

If you don't mind using a sentinel object (I use one I name Done to tell my consumer thread we're done so it can wrap up.)

Start = object() # sentinel object on global scope.

in producer:

queue.put(Start)

and in worker:

item = queue.get() # blocks until something received
if item is Start:
    print('we have now started!')

I'm not sure why you'd do that though, but this does seem to do what you want it to do.

Upvotes: 0

Antoine
Antoine

Reputation: 1070

Just myQueue.get(block=True) will bock your thread (stop its execution) until there is something to retrieve from the queue. When an item is availabe in the queue it will be returned by this call. You can add a timeout in case you want to exit if the queue is never feed.

See https://docs.python.org/3/library/queue.html#queue.Queue.get.

Upvotes: 2

Related Questions