Reputation: 451
I am trying to parallelize operations on a big array. I summarized my approach in the code snippet below. Since the operations on the big array are costly, of the 100 processes, I want to parallelize 4 (i.e. n_cpus) at each iteration. After an iteration is finished, some garbage collection will be done and the next iteration should start. The main loop does the first iteration and terminates. I will be glad if some parallel processing expert can point out how I can correct my code to achieve the desired task.
from multiprocessing import Process
def train_model(model, big_array, i):
model = do_operations_on(big_array)
# edit: this part is within a class
n_processes = 100
n_cpus = 4
models = [None for _ in range(n_processes)]
n_iterations = n_processes / n_cpus
for it in range(n_iterations):
procs = [Process(target=train_model, \
args=(models[it*n_cpus+i], big_array, i)) for i in range(n_cpus)]
for p in procs: p.start()
for p in procs: p.join()
Upvotes: 0
Views: 810
Reputation: 76297
Your idea seems basically OK, except for a few problems:
As RaJa pointed out, you should probably pass stuff using queues instead of using shared state
I think your use of multiprocessing.Process
is unnecessarily low level here; you should be using multiprocessing.Pool
, which would also be more efficient, as you can just reuse the processes (instead of keeping up setting them up and tearing them down).
Your code has some mixup as your train_model
ignores model
and i
, and just overwrites model.
So, in the following code, I assume you have something like
def train_model(specs, big_array):
return ...
that takes some spec
specifics and data, and returns a model built for these specifics.
I also assume in the following you have some array specifics
containing all the specifics you want to try (and also that it is divisible in the number of cpus, which is not that difficult to get rid of).
Finally, I assume the point is to build a list models
of all the models.
Your code becomes:
from multiprocessing import Pool
n_cpus = 4
n_iterations = len(specifics) / n_cpus
models = []
p = multiprocessing.Pool(n_cpus)
for it in range(n_iterations):
cur_specs = specifics[it * n_cpus: (it + 1) * n_cpu]
cur_models = p.map(lambda specs: train_model(specs, big_array), cur_specs)
models.extend(cur_models)
# Cleanup here
Upvotes: 1