Sam Redway
Sam Redway

Reputation: 8157

The correct implementation of queue.Queue when multithreading in Python

I am learning python and wrote some simple scripts to give myself practical examples of various topics. One such is this script to demonstrate how queue.Queue() can be used with threading.Thread() to create back ground workers. It acts quite strangely though. I ran some time trials. With just one thread it does as you would expect... it takes roughly 2 seconds per task taking (actually just under??) 40 seconds to complete the 20 tasks. With four threads it does again as you would expect. It does the tasks 4 at a time and so takes around 10 secs. So how an earth when I run 20 threads does it take 0.01 seconds (1 s.f.) --- surely it must take 2 secs???

Here is the code:

import threading
from queue import Queue
import time

q = Queue()
tLock = threading.Lock()

def run() :

    while True :
        task = q.get()  
        print('Just finished task number',task)
        q.task_done()   
        time.sleep(2)

def main() :
    # worker threads are activated  
    for x in range(20) :
        t = threading.Thread(target=run)
        t.daemon = True
        t.start()
    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()

if __name__ == '__main__' :
    startTime = time.time()
    main()
    print('Time taken was', time.time() - startTime)

Upvotes: 3

Views: 979

Answers (1)

Reut Sharabani
Reut Sharabani

Reputation: 31349

You're not actually blocking the progress of the main thread:

The "proper"(*) way is to make sure all threads are done, by joining all threads:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()

    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()
    # join all threads
    for t in threads:
        t.join()

*But, this won't work as your threads are in an infinite loop, even tasks are done.

So another way is to make sure you wait before you report back the task:

def run() :
    while True :
        task = q.get()  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

Still, threads are remained blocked. What you chould have is a message to threads telling them to quit. Something like:

def run() :
    while True :
        task = q.get()
        if task == 'stop':
            break  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

and now simply tell the main thread to put enough stop messages for all threads to finally quit their infinite loop:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()

    #20 jobs are put in the queue   
    for x in range(1,21):
        q.put(x)

    for x in range(20):
        # stop all threads after tasks are done
        q.put('stop')

    # waits until queue is empty and then continues
    q.join()

    # join all threads
    for t in threads:
        t.join()

Tip: You shouldn't use "magic numbers" such as 20. Have a global variable in the module level named THREADS_COUNT so you only have to change one place when you want to test different configrations.

Upvotes: 4

Related Questions