Johnathon64
Johnathon64

Reputation: 1320

Child process hanging multiprocessing

I am having a problem where child processes are hanging in my python application, only 4/16 processes have finished all of these processes are adding to a multiprocessing queue. https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues According to python docs:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

I believe this may be my problem, however I do a get() off the queue before I join. I am not sure what other alternatives I can take.

def RunInThread(dictionary):
    startedProcesses = list()
    resultList = list()
    output = Queue()
    scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

    for item in scriptList:
        if __name__ == '__main__':
            proc = Process(target=CreateScript, args=(item, output))
            startedProcesses.append(proc)
            proc.start()

    while not output.empty():
        resultList.append(output.get())

    #we must wait for the processes to finish before continuing
    for process in startedProcesses:
        process.join()
        print "finished"

#defines chunk of data each thread will process
def ThreadChunk(seq, num):
  avg = len(seq) / float(num)
  out = []
  last = 0.0

  while last < len(seq):
    out.append(seq[int(last):int(last + avg)])
    last += avg

  return out

def CreateScript(scriptsToGenerate, queue):
    start = time.clock()
    for script in scriptsToGenerate:
    ...
        queue.put([script['timeInterval'], script['script']])

    print time.clock() - start
    print "I have finished"

Upvotes: 1

Views: 2728

Answers (1)

301_Moved_Permanently
301_Moved_Permanently

Reputation: 4186

The issue with your code is that while not output.empty() is not reliable (see empty). You might also run into the scenario where the interpreter hits while not output.empty() before the processes you created finished their initialization (thus having the Queue actually empty).

Since you know exactly how much items will be put in the queue (i.e. len(dictionnary)) you can read that number of items from the queue:

def RunInThread(dictionary):
    startedProcesses = list()
    output = Queue()
    scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

    for item in scriptList:
        proc = Process(target=CreateScript, args=(item, output))
        startedProcesses.append(proc)
        proc.start()

    resultList = [output.get() for _ in xrange(len(dictionary))]

    #we must wait for the processes to finish before continuing
    for process in startedProcesses:
        process.join()

    print "finished"

If at some point you're modifying your script and don't know anymore howmuch items will be produced, you can use Queue.get with a reasonnable timeout:

def RunInThread(dictionary):
    startedProcesses = list()
    resultList = list()
    output = Queue()
    scriptList = ThreadChunk(dictionary, 16) # last number determines how many threads

    for item in scriptList:
        proc = Process(target=CreateScript, args=(item, output))
        startedProcesses.append(proc)
        proc.start()

    try:
        while True:
            resultList.append(output.get(True, 2)) # block for a 2 seconds timeout, just in case
    except queue.Empty:
        pass # no more items produced

    #we must wait for the processes to finish before continuing
    for process in startedProcesses:
        process.join()

    print "finished"

You might need to adjust the timeout depending on the actual time of the computation in your CreateScript.

Upvotes: 1

Related Questions