Allen Vandiver
Allen Vandiver

Reputation: 31

Python multiprocessing children hang when run exits

I am debugging an issue at the moment using multiprocessing.
I have the following child:

class Target(multiprocessing.Process):
    def __init__(self, work_queue, result_queue, suite_name, test_settings, html_log_dir, output_file, ip_address, fit_dir):
        multiprocessing.Process.__init__(self)

        # initialize other variables

    def run(self):
        print multiprocessing.current_process()
        suite_start_time = time.clock()
        while not self.kill_received:
            # get a task
            try:
                job = self.work_queue.get(True, 2)
            except Queue.Empty:
                self._log('Work queue empty, creating XML result file')
                self.create_xml_result_file(suite_start_time)
                break

            # the actual processing, run the test.

        fitnesse_common.log_output("\n(PID " + str(self.pid) + "): End of process")

    def create_xml_result_file(self, start_time):
        # generate result 

The parent process basically just launches several (12) targets and waits for them all to join.

The issue is that for some reason the child processes are running to the end of the run function (I see the end of process prints), and then not terminating for some reason, which prevents the parent process from continuing.

EDIT - Not all the spawned processes hang, only a couple of them. Of the 12 spawned processes, usually only 2-4 of them hang after completing their run function.

I considered calling terminate at the end of the run function, but the Python documentation indicates that it is a bad idea.

I have looked at several different articles on Stack Overflow regarding Python multprocessing, and most of them related to issues with the parent process.

Any thoughts or help would be much appreciated.

UPDATE: Here is a script that readily reproduced the problem:

import multiprocessing, Queue
import subprocess
import time
import sys

class Target(multiprocessing.Process):
    def __init__(self, work_queue, results_queue, delay_length):
        # base class initialization
        multiprocessing.Process.__init__(self)

        # job management stuff
        self.work_queue = work_queue
        self.results_queue = results_queue
        self.delay_length = delay_length
        self.kill_received = False

    def run(self):
        while not self.kill_received:
            # get a task
            try:
                job = self.work_queue.get(True, 2)
            except Queue.Empty:
                self._log('Work queue empty, prepare to terminate')
                break

            time.sleep(self.delay_length)
            self._log("Sleeping done")

            results = self._run_an_application(job)

            self.results_queue.put(results)
            self._log("Have put results on queue " + str(job) + "-" + results)
        self._log("\n(PID " + str(self.pid) + "): End of process")
    def _log(self, text):
        print ('PID ' + str(self.pid) + ' => ' + text)
        sys.stdout.flush()

    def _run_an_application(self, app):
        try: 
            test_output = subprocess.check_output(app)
        except subprocess.CalledProcessError, e:
            log_output('### Process check_output threw exception CalledProcessError')
            test_output = e.output

        return test_output

if __name__ == "__main__":

    test_jobs = []
    started_targets = []

    # run
    # load up work queue
    for i in range(500):
        test_jobs.append('spewage')
    work_queue = multiprocessing.Queue()

    for job in test_jobs:
        work_queue.put(job)

    # create a queue to pass to targets to store the results
    result_queue = multiprocessing.Queue()

    # spawn targets
    for i in range(12):
        started_targets.append(Target(work_queue, result_queue, i))

    # start all targets
    for i in range(len(started_targets)):
        started_targets[i].start()
        print "starting process no %s with id: %s" % (i, started_targets[i].pid)

    print 'Waiting for all processes to join'

    # wait for all targets to finish
    for i in range(len(started_targets)):
        started_targets[i].join()

    print 'All processes have joined'

    # collect the results off the queue
    while not result_queue.empty():
        target_result = result_queue.get()
        print "Test job - " + target_result
    print ('All Tests completed')

Here is the source code (it is C++) of the "spewage" application.

#include <iostream>
#include <windows.h>
using namespace std;

int main()
{
    for (int i = 0; i < 500; i++)
    {
        cout << "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" << endl;
        Sleep(20); 
    }
    return 0;
}

Since it appears to be related to the amount pushed to stdout, the C++ program could easily be replaced by another script that prints a lot of stuff.

Upvotes: 2

Views: 1330

Answers (1)

Allen Vandiver
Allen Vandiver

Reputation: 31

I managed to figure out the issue. It seems to have been related to the amount of output in the subprocesses. At the end of the process' run() function, I needed to put self.results_queue.cancel_join_thread()

I am still curious as to why it works when there isn't a lot of stdout, but the processes hang when there is. According to the Python documentation, the way I was using the result_queue should have locked up consistently, even though it didn't.

Upvotes: 1

Related Questions