Reputation: 1320
I am having a problem where child processes are hanging in my python application, only 4/16 processes have finished all of these processes are adding to a multiprocessing queue. https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues According to python docs:
Warning
As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.
I believe this may be my problem, however I do a get() off the queue before I join. I am not sure what other alternatives I can take.
def RunInThread(dictionary):
startedProcesses = list()
resultList = list()
output = Queue()
scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads
for item in scriptList:
if __name__ == '__main__':
proc = Process(target=CreateScript, args=(item, output))
startedProcesses.append(proc)
proc.start()
while not output.empty():
resultList.append(output.get())
#we must wait for the processes to finish before continuing
for process in startedProcesses:
process.join()
print "finished"
#defines chunk of data each thread will process
def ThreadChunk(seq, num):
avg = len(seq) / float(num)
out = []
last = 0.0
while last < len(seq):
out.append(seq[int(last):int(last + avg)])
last += avg
return out
def CreateScript(scriptsToGenerate, queue):
start = time.clock()
for script in scriptsToGenerate:
...
queue.put([script['timeInterval'], script['script']])
print time.clock() - start
print "I have finished"
Upvotes: 1
Views: 2728
Reputation: 4186
The issue with your code is that while not output.empty()
is not reliable (see empty). You might also run into the scenario where the interpreter hits while not output.empty()
before the processes you created finished their initialization (thus having the Queue actually empty).
Since you know exactly how much items will be put in the queue (i.e. len(dictionnary)
) you can read that number of items from the queue:
def RunInThread(dictionary):
startedProcesses = list()
output = Queue()
scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads
for item in scriptList:
proc = Process(target=CreateScript, args=(item, output))
startedProcesses.append(proc)
proc.start()
resultList = [output.get() for _ in xrange(len(dictionary))]
#we must wait for the processes to finish before continuing
for process in startedProcesses:
process.join()
print "finished"
If at some point you're modifying your script and don't know anymore howmuch items will be produced, you can use Queue.get
with a reasonnable timeout:
def RunInThread(dictionary):
startedProcesses = list()
resultList = list()
output = Queue()
scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads
for item in scriptList:
proc = Process(target=CreateScript, args=(item, output))
startedProcesses.append(proc)
proc.start()
try:
while True:
resultList.append(output.get(True, 2)) # block for a 2 seconds timeout, just in case
except queue.Empty:
pass # no more items produced
#we must wait for the processes to finish before continuing
for process in startedProcesses:
process.join()
print "finished"
You might need to adjust the timeout depending on the actual time of the computation in your CreateScript
.
Upvotes: 1