Reputation: 8372
Using Python I want to start multiple processes which run the same executable with different parameters in parallel. When all are finished I want to check there were not errors and then do some more processing.
I have this already:
def main(path_of_script):
path_of_exe = make_path_to_exe(path_of_script)
#
lst_standin_params = [["1", "5"], ["2", "1"]]
#
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
#
future_standin_exe = {
executor.submit(
subprocess.Popen(
[path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]]
)
): standin_arg_lst for standin_arg_lst in lst_standin_params
}
#
for future in concurrent.futures.as_completed(future_standin_exe):
tmp_rv_holder = future_standin_exe[future]
#
try:
data = future.result()
except Exception as exc:
print('An exception occurred: %s' % (exc))
The processes run fine but I'm clearly doing something wrong with respect to checking that each process started by subprocess.Popen
has completed successfully. I think I need a way to capture the return value from the call to subprocess.Popen
but I'm not sure how to .
The code as is stands throws an exception when the line data = future.result()
is executed with an exception can't pickle _thread.lock objects
. I'm pretty sure that attempting to use the Future
object is the wrong idea but I can't work out how to access the results of the execution.
Upvotes: 0
Views: 432
Reputation: 143057
You should create function which uses stdout=PIPE
and p.stdout.read()
to catch output
def func(path_of_exe, task_id, task_delay):
p = subprocess.Popen(
[path_of_exe, task_id, task_delay],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return [p.stdout.read(), p.stderr.read()]
and later use it in executor
future_standin_exe = {
executor.submit(
func, path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]
): standin_arg_lst for standin_arg_lst in lst_standin_params
}
And it has to be func, arg1, arg2, arg3
, not func(arg1, arg2, arg3)
And later you can display both outputs.
data = future.result()
for item in data:
print(item)
or
stdout, stderr = future.result()
print('stdout:', stdout)
print('stderr:', stderr)
Minimal code which I used for test.
I didn't have any program to run so I used command ls
which gives some output but it is useless.
import concurrent.futures
import subprocess
TASK_ID_IDX = 0
TASK_DELAY_IDX = 0
def func(path_of_exe, standin_arg_lst):
p = subprocess.Popen(
[path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return [p.stdout.read(), p.stderr.read()]
def make_path_to_exe(path):
return path
def main(path_of_script):
path_of_exe = make_path_to_exe(path_of_script)
#
lst_standin_params = [["1", "5"], ["2", "1"]]
#
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
#
future_standin_exe = {
executor.submit(
func, path_of_exe, standin_arg_lst
): standin_arg_lst for standin_arg_lst in lst_standin_params
}
#
for future in concurrent.futures.as_completed(future_standin_exe):
tmp_rv_holder = future_standin_exe[future]
#
try:
data = future.result()
for item in data:
print(item)
except Exception as exc:
print('An exception occurred: %s' % (exc))
main('dir')
Upvotes: 4