Reputation: 123
I am trying to set up 3 thread and execute 5 tasks in a queue. The idea is that the threads will first run the first 3 tasks at the same time, then 2 threads finish the remaining 2. But the program seems freeze. I couldn't detect anything wrong with it.
from multiprocessing import Manager
import threading
import time
global exitFlag
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)
def process_data(threadName, q):
global exitFlag
while not exitFlag:
if not workQueue.empty():
data = q.get()
print("%s processing %s" % (threadName, data))
else:
pass
time.sleep(1)
print('Nothing to Process')
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Manager().Queue(10)
threads = []
threadID = 1
# create thread
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# fill up queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# wait queue clear
while not workQueue.empty():
pass
# notify thread exit
exitFlag = 1
# wait for all threads to finish
for t in threads:
t.join()
print("Exiting Main Thread")
I don't know what happened exactly, but after I remove the join()
part, the program is able to run just fun. What I don't understand is that exitFlag is supposed to have sent out the signal when the queue is emptied. So it seems somehow the signal was not detected by process_data()
Upvotes: 4
Views: 2555
Reputation: 21654
There are multiple issues with your code. First of, threads in CPython don't run Python code "at the same time" because of the global interpreter lock (GIL). A thread must hold the GIL to execute Python bytecode. By default a thread holds the GIL for up to 5 ms (Python 3.2+), if it doesn't drop it earlier because it does blocking I/O. For parallel execution of Python code you would have to use multiprocessing
.
You also needlessly use a Manager.Queue
instead of a queue.Queue
. A Manager.Queue
is a queue.Queue
on a separate manager-process. You introduced a detour with IPC and memory copying for no benefit here.
The cause of your deadlock is that you have a race condition here:
if not workQueue.empty():
data = q.get()
This is not an atomic operation. A thread can check workQueue.empty()
, then drop the GIL, letting another thread drain the queue and then proceed with data = q.get()
, which will block forever if you don't put something again on the queue. Queue.empty()
checks are a general anti-pattern and there is no need to use it. Use poison pills (sentinel-values) to break a get-loop instead and to let the workers know they should exit. You need as many sentinel-values as you have workers. Find more about iter(callabel, sentinel)
here.
import time
from queue import Queue
from datetime import datetime
from threading import Thread, current_thread
SENTINEL = 'SENTINEL'
class myThread(Thread):
def __init__(self, func, inqueue):
super().__init__()
self.func = func
self._inqueue = inqueue
def run(self):
print(f"{datetime.now()} {current_thread().name} starting")
self.func(self._inqueue)
print(f"{datetime.now()} {current_thread().name} exiting")
def process_data(_inqueue):
for data in iter(_inqueue.get, SENTINEL):
print(f"{datetime.now()} {current_thread().name} "
f"processing {data}")
time.sleep(1)
if __name__ == '__main__':
N_WORKERS = 3
inqueue = Queue()
input_data = ["One", "Two", "Three", "Four", "Five"]
sentinels = [SENTINEL] * N_WORKERS # one sentinel value per worker
# enqueue input and sentinels
for word in input_data + sentinels:
inqueue.put(word)
threads = [myThread(process_data, inqueue) for _ in range(N_WORKERS)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"{datetime.now()} {current_thread().name} exiting")
Example Output:
2019-02-14 17:58:18.265208 Thread-1 starting
2019-02-14 17:58:18.265277 Thread-1 processing One
2019-02-14 17:58:18.265472 Thread-2 starting
2019-02-14 17:58:18.265542 Thread-2 processing Two
2019-02-14 17:58:18.265691 Thread-3 starting
2019-02-14 17:58:18.265793 Thread-3 processing Three
2019-02-14 17:58:19.266417 Thread-1 processing Four
2019-02-14 17:58:19.266632 Thread-2 processing Five
2019-02-14 17:58:19.266767 Thread-3 exiting
2019-02-14 17:58:20.267588 Thread-1 exiting
2019-02-14 17:58:20.267861 Thread-2 exiting
2019-02-14 17:58:20.267994 MainThread exiting
Process finished with exit code 0
If you don't insist on subclassing Thread
, you could also just use multiprocessing.pool.ThreadPool
a.k.a. multiprocessing.dummy.Pool
which does the plumbing for you in the background.
Upvotes: 2