Reputation: 192
My question is hopefully particular enough to not relate to any of the other ones that I've read. I'm wanting to use subprocess and multiprocessing to spawn a bunch of jobs serially and return the return code to me. The problem is that I don't want to wait() so I can spawn the jobs all at once, but I do want to know when it finishes so I can get the return code. I'm having this weird problem where if I poll() the process it won't run. It just hangs out in the activity monitor without running (I'm on a Mac). I thought I could use a watcher thread, but I'm hanging on the q_out.get() which is leading me to believe that maybe I'm filling up the buffer and deadlocking. I'm not sure how to get around this. This is basically what my code looks like. If anyone has any better ideas on how to do this I would be happy to completely change my approach.
def watchJob(p1,out_q):
while p1.poll() == None:
pass
print "Job is done"
out_q.put(p1.returncode)
def runJob(out_q):
LOGFILE = open('job_to_run.log','w')
p1 = Popen(['../../bin/jobexe','job_to_run'], stdout = LOGFILE)
t = threading.Thread(target=watchJob, args=(p1,out_q))
t.start()
out_q= Queue()
outlst=[]
for i in range(len(nprocs)):
proc = Process(target=runJob, args=(out_q,))
proc.start()
outlst.append(out_q.get()) # This hangs indefinitely
proc.join()
Upvotes: 1
Views: 2628
Reputation: 414315
You don't need neither multiprocessing nor threading here. You could run multiple child processes in parallel and collect their statutes all in a single thread:
#!/usr/bin/env python3
from subprocess import Popen
def run(cmd, log_filename):
with open(log_filename, 'wb', 0) as logfile:
return Popen(cmd, stdout=logfile)
# start several subprocesses
processes = {run(['echo', c], 'subprocess.%s.log' % c) for c in 'abc'}
# now they all run in parallel
# report as soon as a child process exits
while processes:
for p in processes:
if p.poll() is not None:
processes.remove(p)
print('{} done, status {}'.format(p.args, p.returncode))
break
p.args
stores cmd
in Python 3.3+, keep track of cmd
yourself on earlier Python versions.
See also:
To limit number of parallel jobs a ThreadPool could be used (as shown in the first link):
#!/usr/bin/env python3
from multiprocessing.dummy import Pool # use threads
from subprocess import Popen
def run_until_done(args):
cmd, log_filename = args
try:
with open(log_filename, 'wb', 0) as logfile:
p = Popen(cmd, stdout=logfile)
return cmd, p.wait(), None
except Exception as e:
return cmd, None, str(e)
commands = ((('echo', str(d)), 'subprocess.%03d.log' % d) for d in range(500))
pool = Pool(128) # 128 concurrent commands at a time
for cmd, status, error in pool.imap_unordered(run_until_done, commands):
if error is None:
fmt = '{cmd} done, status {status}'
else:
fmt = 'failed to run {cmd}, reason: {error}'
print(fmt.format_map(vars())) # or fmt.format(**vars()) on older versions
The thread pool in the example has 128 threads (no more, no less). It can't execute more than 128 jobs concurrently. As soon as any of the threads frees (done with a job), it takes another, etc. Total number of jobs that is executed concurrently is limited by the number of threads. New job doesn't wait for all 128 previous jobs to finish. It is started when any of the old jobs is done.
Upvotes: 2
Reputation: 94891
If you're going to run watchJob
in a thread, there's no reason to busy-loop with p1.poll
; just call p1.wait()
to block until the process finishes. Using the busy loop requires the GIL to constantly be released/re-acquired, which slows down the main thread, and also pegs the CPU, which hurts performance even more.
Also, if you're not using the stdout
of the child process, you shouldn't send it to PIPE
, because that could cause a deadlock if the process writes enough data to the stdout
buffer to fill it up (which may actually be what's happening in your case). There's also no need to use multiprocessing
here; just call Popen
in the main thread, and then have the watchJob
thread wait on the process to finish.
import threading
from subprocess import Popen
from Queue import Queue
def watchJob(p1, out_q):
p1.wait()
out_q.put(p1.returncode)
out_q = Queue()
outlst=[]
p1 = Popen(['../../bin/jobexe','job_to_run'])
t = threading.Thread(target=watchJob, args=(p1,out_q))
t.start()
outlst.append(out_q.get())
t.join()
Edit:
Here's how to run multiple jobs concurrently this way:
out_q = Queue()
outlst = []
threads = []
num_jobs = 3
for _ in range(num_jobs):
p = Popen(['../../bin/jobexe','job_to_run'])
t = threading.Thread(target=watchJob, args=(p1, out_q))
t.start()
# Don't consume from the queue yet.
# All jobs are running, so now we can start
# consuming results from the queue.
for _ in range(num_jobs):
outlst.append(out_q.get())
t.join()
Upvotes: 1