Reputation: 71
I am facing some issues while implementing multi-threading in python. The issue is very specific to my use case. Having gone through numerous posts on the same, I deployed the most widely suggested/used method for doing so.
I start by defining my thread class as follows.
class myThread(Thread):
def __init__(self, graphobj, q):
Thread.__init__(self)
self.graphobj = graphobj
self.q = q
def run(self):
improcess(self.graphobj, self.q)
Post which I define my function that does all the processing required.
def improcess(graphobj, q):
while not exitFlag:
queueLock.acquire()
if not q.empty():
photo_id = q.get()
queueLock.release()
# Complete processing
else:
queueLock.release()
Now comes the part where I am stuck. I am able to run the below mentioned code exactly as it is without any issues. However if I try and wrap the same in a function as such it breaks down.
def train_control(graphobj, photo_ids):
workQueue = Queue(len(photo_ids))
for i in range(1,5):
thread = myThread(graphobj=graphobj, q=workQueue)
thread.start()
threads.append(thread)
queueLock.acquire()
for photo_id in photo_ids:
workQueue.put(photo_id)
queueLock.release()
while not workQueue.empty():
pass
exitFlag = 1
for t in threads:
t.join()
By breaking down I mean that the threads complete their work but they don't stop waiting i.e. the exitFlag is never set to 1. I am unsure as to how to make this work.
Unfortunately the design of our systems is such that this piece of codes needs to be wrapped in a function which can be invoked by another module, so pulling it out is not really an option.
Looking forward to hearing from experts on this. Thanks in advance.
Edit : Forgot to mention this in the first draft. I globally initialize exitFlag and set its value to 0.
Below is the minimum, verifiable code snippet that I created to capture this problem:
import threading
import Queue
globvar01 = 5
globvar02 = 7
exitFlag = 0
globlist = []
threads = []
queueLock = threading.Lock()
workQueue = Queue.Queue(16)
class myThread(threading.Thread):
def __init__(self, threadID, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.q = q
def run(self):
print "Starting thread " + str(self.threadID)
myfunc(self.threadID, self.q)
print "Exiting thread " + str(self.threadID)
def myfunc(threadID, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
thoughtnum = q.get()
queueLock.release()
print "Processing thread " + str(threadID)
if (thoughtnum < globvar01):
globlist.append([1,2,3])
elif (thoughtnum < globvar02):
globlist.append([2,3,4])
else:
queueLock.release()
def controlfunc():
for i in range(1,5):
thread = myThread(i, workQueue)
thread.start()
threads.append(thread)
queueLock.acquire()
for i in range(1,11):
workQueue.put(i)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
pass
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
print "Starting main thread"
controlfunc()
print "Exiting Main Thread"
Upvotes: 2
Views: 181
Reputation: 177674
From your MCVE, the only thing missing is:
while not workQueue.empty():
pass
global exitFlag # Need this or `exitFlag` is a local variable only.
exitFlag = 1
You could eliminate the queueLock and the exitFlag, however, by using a sentinel value in the Queue to shut down the worker threads, and it eliminates the spin-waiting. Worker threads will sleep on a q.get()
and the main thread won't have to spin-wait for an empty queue:
#!python2
from __future__ import print_function
import threading
import Queue
debug = 1
console = threading.Lock()
def tprint(*args,**kwargs):
if debug:
name = threading.current_thread().getName()
with console:
print('{}: '.format(name),end='')
print(*args,**kwargs)
globvar01 = 5
globvar02 = 7
globlist = []
threads = []
workQueue = Queue.Queue(16)
class myThread(threading.Thread):
def __init__(self, threadID, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.q = q
def run(self):
tprint("Starting thread " + str(self.threadID))
myfunc(self.threadID, self.q)
tprint("Exiting thread " + str(self.threadID))
def myfunc(threadID, q):
while True:
thoughtnum = q.get()
tprint("Processing thread " + str(threadID))
if thoughtnum is None:
break
elif thoughtnum < globvar01:
globlist.append([1,2,3])
elif thoughtnum < globvar02:
globlist.append([2,3,4])
def controlfunc():
for i in range(1,5):
thread = myThread(i, workQueue)
thread.start()
threads.append(thread)
for i in range(1,11):
workQueue.put(i)
# Wait for all threads to complete
for t in threads:
workQueue.put(None)
for t in threads:
t.join()
tprint("Starting main thread")
controlfunc()
tprint("Exiting Main Thread")
Output:
MainThread: Starting main thread
Thread-1: Starting thread 1
Thread-2: Starting thread 2
Thread-3: Starting thread 3
Thread-4: Starting thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Exiting thread 3
Thread-4: Exiting thread 4
Thread-1: Exiting thread 1
Thread-2: Exiting thread 2
MainThread: Exiting Main Thread
Upvotes: 1
Reputation: 998
You need to make sure exitFlag
is set to 0 (False
) before spawning any threads otherwise in impprocess()
they won't do anything and the queue will remain non-empty.
This problem could happen if you have exitFlag
as a global and it's not cleared from a previous run.
Upvotes: 0