vansdev
vansdev

Reputation: 43

why is Queue.join() necessary here?

i am learning python's threading module and have wrote the following code to help myself understand

from Queue import Queue
import threading

lock = threading.Lock()
MAX_THREADS = 8
q = Queue()
count = 0

# some i/o process
def io_process(x):
    pass

# process that deals with shared resources
def shared_resource_process(x):
    pass

def func():
    global q, count
    while not q.empty():
        x = q.get()
        io_process(x)
        if lock.acquire():
            shared_resource_process(x)
            print '%s is processing %r' %(threading.currentThread().getName(), x)
            count += 1          
            lock.release()

def main():
    global q
    for i in range(40):
        q.put(i)

    threads = []
    for i in range(MAX_THREADS):
        threads.append(threading.Thread(target=func))

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    print 'multi-thread done.'
    print count == 40

if __name__ == '__main__':
    main()

and the output got stuck like this:

Thread-1 is processing 32
Thread-8 is processing 33
Thread-6 is processing 34
Thread-2 is processing 35
Thread-5 is processing 36
Thread-3 is processing 37
Thread-7 is processing 38
Thread-4 is processing 39

Note that the prints in main() is not executed which means that some threads are hanging /blocking ?

then i modify the func() method by adding q.task_done():

if lock.acquire():
            shared_resource_process(x)
            print '%s is processing %r' %(threading.currentThread().getName(), x)
            count += 1
            q.task_done()  # why is this necessary ?
            lock.release()

and now all threads terminates as i expected and get the right output:

Thread-6 is processing 36
Thread-4 is processing 37
Thread-3 is processing 38
Thread-7 is processing 39
multi-thread done.
True

Process finished with exit code 0

i read the doc of Queue.Queue here and see that task_done() works with queue.join() to make sure all items in queue are processed. but since i did not call queue.join() in main(), why is the task_done() necessary here in func()? What is the cause of thread hanging / blocking when i miss the task_done() code?

Upvotes: 4

Views: 1132

Answers (1)

niemmi
niemmi

Reputation: 17263

You have a race condition in your code. Imagine that you have only one item left in the Queue and you'd be using only two threads instead of 8. Then following sequence of events happen:

  1. Thread A calls q.empty to check if it's empty or not. Since there is one item in the queue result is False and loop body is executed.
  2. Before thread A calls q.get there's a context switch and thread B gets to run.
  3. Thread B calls q.empty, there's still one item in the queue thus the result is False and loop body is executed.
  4. Thread B calls q.get without parameters and it immediately returns with the last item from the queue. Then thread B processes the item and exits since q.empty returns True.
  5. Thread A gets to run. Since it already called q.empty in step 1 it will call q.get next but this will block forever thus your program won't terminate.

You can simulate the above behavior by importing time and changing the loop a bit:

while not q.empty():
    time.sleep(0.1) # Force context switch
    x = q.get()

Note that behavior is the same no matter if task_done is called or not.

So why did adding task_done help? By default Python 2 will do context switch every 100 interpreter instructions so adding code might have changed the place where context switch occurs. See another question and linked PDF for better explanation. On my machine the program didn't hang no matter if task_done was there or not so this is just a speculation what caused it to happen for you.

If you want to fix the behavior you could just have infinite loop and pass parameter to get instructing it to not block. This causes get to eventually throw Queue.Empty exception that you can catch and then break the loop:

from Queue import Queue, Empty

def func():
    global q, count
    while True:
        try:
            x = q.get(False)
        except Empty:
            break
        io_process(x)
        if lock.acquire():
            shared_resource_process(x)
            print '%s is processing %r' %(threading.currentThread().getName(), x)
            count += 1
            lock.release()

Upvotes: 3

Related Questions