Reputation: 43
i am learning python's threading module and have wrote the following code to help myself understand
from Queue import Queue
import threading
lock = threading.Lock()
MAX_THREADS = 8
q = Queue()
count = 0
# some i/o process
def io_process(x):
pass
# process that deals with shared resources
def shared_resource_process(x):
pass
def func():
global q, count
while not q.empty():
x = q.get()
io_process(x)
if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
lock.release()
def main():
global q
for i in range(40):
q.put(i)
threads = []
for i in range(MAX_THREADS):
threads.append(threading.Thread(target=func))
for t in threads:
t.start()
for t in threads:
t.join()
print 'multi-thread done.'
print count == 40
if __name__ == '__main__':
main()
and the output got stuck like this:
Thread-1 is processing 32
Thread-8 is processing 33
Thread-6 is processing 34
Thread-2 is processing 35
Thread-5 is processing 36
Thread-3 is processing 37
Thread-7 is processing 38
Thread-4 is processing 39
Note that the prints in main() is not executed which means that some threads are hanging /blocking ?
then i modify the func() method by adding q.task_done():
if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
q.task_done() # why is this necessary ?
lock.release()
and now all threads terminates as i expected and get the right output:
Thread-6 is processing 36
Thread-4 is processing 37
Thread-3 is processing 38
Thread-7 is processing 39
multi-thread done.
True
Process finished with exit code 0
i read the doc of Queue.Queue here and see that task_done() works with queue.join() to make sure all items in queue are processed. but since i did not call queue.join() in main(), why is the task_done() necessary here in func()? What is the cause of thread hanging / blocking when i miss the task_done() code?
Upvotes: 4
Views: 1132
Reputation: 17263
You have a race condition in your code. Imagine that you have only one item left in the Queue
and you'd be using only two threads instead of 8. Then following sequence of events happen:
q.empty
to check if it's empty or not. Since there is one item in the queue result is False
and loop body is executed.q.get
there's a context switch and thread B gets to run.q.empty
, there's still one item in the queue thus the result is False
and loop body is executed.q.get
without parameters and it immediately returns with the last item from the queue. Then thread B processes the item and exits since q.empty
returns True
.q.empty
in step 1 it will call q.get
next but this will block forever thus your program won't terminate.You can simulate the above behavior by importing time
and changing the loop a bit:
while not q.empty():
time.sleep(0.1) # Force context switch
x = q.get()
Note that behavior is the same no matter if task_done
is called or not.
So why did adding task_done
help? By default Python 2 will do context switch every 100 interpreter instructions so adding code might have changed the place where context switch occurs. See another question and linked PDF for better explanation. On my machine the program didn't hang no matter if task_done
was there or not so this is just a speculation what caused it to happen for you.
If you want to fix the behavior you could just have infinite loop and pass parameter to get
instructing it to not block. This causes get
to eventually throw Queue.Empty
exception that you can catch and then break the loop:
from Queue import Queue, Empty
def func():
global q, count
while True:
try:
x = q.get(False)
except Empty:
break
io_process(x)
if lock.acquire():
shared_resource_process(x)
print '%s is processing %r' %(threading.currentThread().getName(), x)
count += 1
lock.release()
Upvotes: 3