ihadanny
ihadanny

Reputation: 4483

python - I want multiple threads to spawn multiple processes, all should run in parallel

I have a function called run_3_processes, which spawns 3 processes (duh) using multiprocessing.pool.apply, waits for their results and processes these results and returns a single result.

I have another function called run_3_processes_3_times, which should run run_3_processes 3 times in parallel, wait for all of them to return and then process all their results.

things I tried:

  1. using a process pool for run_3_processes_3_times - turns out this is complicated because of Python Process Pool non-daemonic?
  2. rewriting the entire applicative code to spawn 9 processes with the same pool - this really convoluted my code and breaks encapsulation
  3. using a threadpool.apply for run_3_processes_3_times - for some reason this makes it runs serially, not in parallel - is it because the apply in run_3_processes blocks the GIL?

I'm sure there's a one-liner solution I'm missing... thanks!

Upvotes: 1

Views: 1774

Answers (2)

Z4-tier
Z4-tier

Reputation: 7978

Since you're using a combination of true threads and sub-processes, you will "sort of" run into the GIL, but the way it's structured makes it seem unlikely that it will be a problem. The ThreadPool will be subject to context switches to give concurrency between the threads, but since it's only purpose is to spawn the child processes, it's not doing anything CPU intensive. I'm not sure why it's even necessary to use multiple threads; I would probably just have a single threaded parent process spawn and wait on the child processes directly.

In both functions, it's probably more idiomatic to use the map() method instead of apply_async(), although both will work. Usually that would look a bit like this:

process_count = 3

def pre_process(input_data):
    input_subsets = [[]] * process_count
    for idx, data_point in enumerate(input_data):
        <do any input validation on data_point>
        input_subsets[idx % process_count].append(data_point)
    return input_subsets

def process_data(input_data):
    return_val = []
    for val in input_data:
        <do some processing work>
        return_val.append(<result of processing>) 
    return return_val

data_subsets = pre_process(raw_data)
pool = mp.Pool(process_count)
result_list = pool.map(process_data, data_subsets)
<check result_list>

Upvotes: 1

ihadanny
ihadanny

Reputation: 4483

ok, found a hacky answer, would love to hear if there's something better:


def run_3_processes_3_times():
        pool = ThreadPool(3)
        candidates = [pool.apply_async(run_3_processes,
                                 args=(c)) for c in range(3)]
        candidates = [x.get() for x in candidates]
        pool.close()

def run_3_processes(c):
        pool = mp.Pool(3)
        solutions = [pool.apply_async(do_it,
                                      args=(i) for i in range(3)]
        solutions = [x.get() for x in solutions]
        pool.close()

Upvotes: 1

Related Questions