shearichard
shearichard

Reputation: 8372

Capturing return values from multiple processes

Summary

Using Python I want to start multiple processes which run the same executable with different parameters in parallel. When all are finished I want to check there were not errors and then do some more processing.

What I've tried

I have this already:

def main(path_of_script):
    path_of_exe = make_path_to_exe(path_of_script)
    #
    lst_standin_params = [["1", "5"], ["2", "1"]]
    #
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        #
        future_standin_exe = { 
            executor.submit(
                subprocess.Popen(
                    [path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]]
                )
            ): standin_arg_lst for standin_arg_lst in lst_standin_params 
        }
        #
        for future in concurrent.futures.as_completed(future_standin_exe):
            tmp_rv_holder = future_standin_exe[future]
            #
            try:
                data = future.result()
            except Exception as exc:
                print('An exception occurred: %s' % (exc))

Question

The processes run fine but I'm clearly doing something wrong with respect to checking that each process started by subprocess.Popen has completed successfully. I think I need a way to capture the return value from the call to subprocess.Popen but I'm not sure how to .

The code as is stands throws an exception when the line data = future.result() is executed with an exception can't pickle _thread.lock objects. I'm pretty sure that attempting to use the Future object is the wrong idea but I can't work out how to access the results of the execution.

Upvotes: 0

Views: 432

Answers (1)

furas
furas

Reputation: 143057

You should create function which uses stdout=PIPE and p.stdout.read() to catch output

def func(path_of_exe, task_id, task_delay):
    
    p = subprocess.Popen(
        [path_of_exe, task_id, task_delay],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,        
    )
    
    return [p.stdout.read(), p.stderr.read()]

and later use it in executor

    future_standin_exe = { 
        executor.submit(
            func, path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]
        ): standin_arg_lst for standin_arg_lst in lst_standin_params 
    }

And it has to be func, arg1, arg2, arg3, not func(arg1, arg2, arg3)

And later you can display both outputs.

            data = future.result()
            for item in data:
                print(item)

or

            stdout, stderr = future.result()
            print('stdout:', stdout)
            print('stderr:', stderr)

Minimal code which I used for test.

I didn't have any program to run so I used command ls which gives some output but it is useless.

import concurrent.futures
import subprocess

TASK_ID_IDX = 0
TASK_DELAY_IDX = 0

def func(path_of_exe, standin_arg_lst):
    p = subprocess.Popen(
        [path_of_exe, standin_arg_lst[TASK_ID_IDX], standin_arg_lst[TASK_DELAY_IDX]],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,        
    )
    return [p.stdout.read(), p.stderr.read()]

def make_path_to_exe(path):
    return path

def main(path_of_script):
    path_of_exe = make_path_to_exe(path_of_script)
    #
    lst_standin_params = [["1", "5"], ["2", "1"]]
    #
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        #
        future_standin_exe = { 
            executor.submit(
                func, path_of_exe, standin_arg_lst
            ): standin_arg_lst for standin_arg_lst in lst_standin_params 
        }
        #
        
        for future in concurrent.futures.as_completed(future_standin_exe):
            tmp_rv_holder = future_standin_exe[future]
            #
            try:
                data = future.result()
                for item in data:
                    print(item)
            except Exception as exc:
                print('An exception occurred: %s' % (exc))
                
main('dir')                

Upvotes: 4

Related Questions