CalMlynarczyk
CalMlynarczyk

Reputation: 705

Python Queue waiting for thread before getting next item

I have a queue that always needs to be ready to process items when they are added to it. The function that runs on each item in the queue creates and starts thread to execute the operation in the background so the program can go do other things.

However, the function I am calling on each item in the queue simply starts the thread and then completes execution, regardless of whether or not the thread it started completed. Because of this, the loop will move on to the next item in the queue before the program is done processing the last item.

Here is code to better demonstrate what I am trying to do:

queue = Queue.Queue()
t = threading.Thread(target=worker)
t.start()

def addTask():
    queue.put(SomeObject())

def worker():
    while True:
        try:
            # If an item is put onto the queue, immediately execute it (unless 
            # an item on the queue is still being processed, in which case wait 
            # for it to complete before moving on to the next item in the queue)
            item = queue.get()
            runTests(item)
            # I want to wait for 'runTests' to complete before moving past this point
        except Queue.Empty, err:
            # If the queue is empty, just keep running the loop until something 
            # is put on top of it.
            pass

def runTests(args):
    op_thread = SomeThread(args)
    op_thread.start()
    # My problem is once this last line 't.start()' starts the thread, 
    # the 'runTests' function completes operation, but the operation executed
    # by some thread is not yet done executing because it is still running in
    # the background. I do not want the 'runTests' function to actually complete
    # execution until the operation in thread t is done executing.
    """t.join()"""
    # I tried putting this line after 't.start()', but that did not solve anything.
    # I have commented it out because it is not necessary to demonstrate what 
    # I am trying to do, but I just wanted to show that I tried it.

Some notes:

This is all running in a PyGTK application. Once the 'SomeThread' operation is complete, it sends a callback to the GUI to display the results of the operation.

I do not know how much this affects the issue I am having, but I thought it might be important.

Upvotes: 4

Views: 6414

Answers (1)

Eli Bendersky
Eli Bendersky

Reputation: 273446

A fundamental issue with Python threads is that you can't just kill them - they have to agree to die.

What you should do is:

  1. Implement the thread as a class
  2. Add a threading.Event member which the join method clears and the thread's main loop occasionally checks. If it sees it's cleared, it returns. For this override threading.Thread.join to check the event and then call Thread.join on itself
  3. To allow (2), make the read from Queue block with some small timeout. This way your thread's "response time" to the kill request will be the timeout, and OTOH no CPU choking is done

Here's some code from a socket client thread I have that has the same issue with blocking on a queue:

class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are placed in
        the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=Queue.Queue(), reply_q=Queue.Queue()):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q
        self.reply_q = reply_q
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)

Note self.alive and the loop in run.

Upvotes: 7

Related Questions