Reputation: 4483
I have a function called run_3_processes
, which spawns 3 processes (duh) using multiprocessing.pool.apply
, waits for their results and processes these results and returns a single result.
I have another function called run_3_processes_3_times
, which should run run_3_processes
3 times in parallel, wait for all of them to return and then process all their results.
things I tried:
run_3_processes_3_times
- turns out this is complicated because of Python Process Pool non-daemonic?threadpool.apply
for run_3_processes_3_times
- for some reason this makes it runs serially, not in parallel - is it because the apply
in run_3_processes
blocks the GIL?I'm sure there's a one-liner solution I'm missing... thanks!
Upvotes: 1
Views: 1774
Reputation: 7978
Since you're using a combination of true threads and sub-processes, you will "sort of" run into the GIL, but the way it's structured makes it seem unlikely that it will be a problem. The ThreadPool
will be subject to context switches to give concurrency between the threads, but since it's only purpose is to spawn the child processes, it's not doing anything CPU intensive. I'm not sure why it's even necessary to use multiple threads; I would probably just have a single threaded parent process spawn and wait on the child processes directly.
In both functions, it's probably more idiomatic to use the map()
method instead of apply_async()
, although both will work. Usually that would look a bit like this:
process_count = 3
def pre_process(input_data):
input_subsets = [[]] * process_count
for idx, data_point in enumerate(input_data):
<do any input validation on data_point>
input_subsets[idx % process_count].append(data_point)
return input_subsets
def process_data(input_data):
return_val = []
for val in input_data:
<do some processing work>
return_val.append(<result of processing>)
return return_val
data_subsets = pre_process(raw_data)
pool = mp.Pool(process_count)
result_list = pool.map(process_data, data_subsets)
<check result_list>
Upvotes: 1
Reputation: 4483
ok, found a hacky answer, would love to hear if there's something better:
def run_3_processes_3_times():
pool = ThreadPool(3)
candidates = [pool.apply_async(run_3_processes,
args=(c)) for c in range(3)]
candidates = [x.get() for x in candidates]
pool.close()
def run_3_processes(c):
pool = mp.Pool(3)
solutions = [pool.apply_async(do_it,
args=(i) for i in range(3)]
solutions = [x.get() for x in solutions]
pool.close()
Upvotes: 1