Reputation: 31
I am debugging an issue at the moment using multiprocessing.
I have the following child:
class Target(multiprocessing.Process):
def __init__(self, work_queue, result_queue, suite_name, test_settings, html_log_dir, output_file, ip_address, fit_dir):
multiprocessing.Process.__init__(self)
# initialize other variables
def run(self):
print multiprocessing.current_process()
suite_start_time = time.clock()
while not self.kill_received:
# get a task
try:
job = self.work_queue.get(True, 2)
except Queue.Empty:
self._log('Work queue empty, creating XML result file')
self.create_xml_result_file(suite_start_time)
break
# the actual processing, run the test.
fitnesse_common.log_output("\n(PID " + str(self.pid) + "): End of process")
def create_xml_result_file(self, start_time):
# generate result
The parent process basically just launches several (12) targets and waits for them all to join.
The issue is that for some reason the child processes are running to the end of the run function (I see the end of process prints), and then not terminating for some reason, which prevents the parent process from continuing.
EDIT - Not all the spawned processes hang, only a couple of them. Of the 12 spawned processes, usually only 2-4 of them hang after completing their run function.
I considered calling terminate at the end of the run function, but the Python documentation indicates that it is a bad idea.
I have looked at several different articles on Stack Overflow regarding Python multprocessing, and most of them related to issues with the parent process.
Any thoughts or help would be much appreciated.
UPDATE: Here is a script that readily reproduced the problem:
import multiprocessing, Queue
import subprocess
import time
import sys
class Target(multiprocessing.Process):
def __init__(self, work_queue, results_queue, delay_length):
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.work_queue = work_queue
self.results_queue = results_queue
self.delay_length = delay_length
self.kill_received = False
def run(self):
while not self.kill_received:
# get a task
try:
job = self.work_queue.get(True, 2)
except Queue.Empty:
self._log('Work queue empty, prepare to terminate')
break
time.sleep(self.delay_length)
self._log("Sleeping done")
results = self._run_an_application(job)
self.results_queue.put(results)
self._log("Have put results on queue " + str(job) + "-" + results)
self._log("\n(PID " + str(self.pid) + "): End of process")
def _log(self, text):
print ('PID ' + str(self.pid) + ' => ' + text)
sys.stdout.flush()
def _run_an_application(self, app):
try:
test_output = subprocess.check_output(app)
except subprocess.CalledProcessError, e:
log_output('### Process check_output threw exception CalledProcessError')
test_output = e.output
return test_output
if __name__ == "__main__":
test_jobs = []
started_targets = []
# run
# load up work queue
for i in range(500):
test_jobs.append('spewage')
work_queue = multiprocessing.Queue()
for job in test_jobs:
work_queue.put(job)
# create a queue to pass to targets to store the results
result_queue = multiprocessing.Queue()
# spawn targets
for i in range(12):
started_targets.append(Target(work_queue, result_queue, i))
# start all targets
for i in range(len(started_targets)):
started_targets[i].start()
print "starting process no %s with id: %s" % (i, started_targets[i].pid)
print 'Waiting for all processes to join'
# wait for all targets to finish
for i in range(len(started_targets)):
started_targets[i].join()
print 'All processes have joined'
# collect the results off the queue
while not result_queue.empty():
target_result = result_queue.get()
print "Test job - " + target_result
print ('All Tests completed')
Here is the source code (it is C++) of the "spewage" application.
#include <iostream>
#include <windows.h>
using namespace std;
int main()
{
for (int i = 0; i < 500; i++)
{
cout << "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" << endl;
Sleep(20);
}
return 0;
}
Since it appears to be related to the amount pushed to stdout, the C++ program could easily be replaced by another script that prints a lot of stuff.
Upvotes: 2
Views: 1330
Reputation: 31
I managed to figure out the issue. It seems to have been related to the amount of output in the subprocesses. At the end of the process' run() function, I needed to put self.results_queue.cancel_join_thread()
I am still curious as to why it works when there isn't a lot of stdout, but the processes hang when there is. According to the Python documentation, the way I was using the result_queue should have locked up consistently, even though it didn't.
Upvotes: 1