user36047
user36047

Reputation: 451

Python: parallelizing operations on a big array iteratively

I am trying to parallelize operations on a big array. I summarized my approach in the code snippet below. Since the operations on the big array are costly, of the 100 processes, I want to parallelize 4 (i.e. n_cpus) at each iteration. After an iteration is finished, some garbage collection will be done and the next iteration should start. The main loop does the first iteration and terminates. I will be glad if some parallel processing expert can point out how I can correct my code to achieve the desired task.

from multiprocessing import Process

def train_model(model, big_array, i):
    model = do_operations_on(big_array)

# edit: this part is within a class
n_processes = 100
n_cpus = 4
models = [None for _ in range(n_processes)]
n_iterations = n_processes / n_cpus
for it in range(n_iterations):
    procs = [Process(target=train_model, \
        args=(models[it*n_cpus+i], big_array, i)) for i in range(n_cpus)]

    for p in procs: p.start()
    for p in procs: p.join()

Upvotes: 0

Views: 810

Answers (1)

Ami Tavory
Ami Tavory

Reputation: 76297

Your idea seems basically OK, except for a few problems:

  • As RaJa pointed out, you should probably pass stuff using queues instead of using shared state

  • I think your use of multiprocessing.Process is unnecessarily low level here; you should be using multiprocessing.Pool, which would also be more efficient, as you can just reuse the processes (instead of keeping up setting them up and tearing them down).

  • Your code has some mixup as your train_model ignores model and i, and just overwrites model.

So, in the following code, I assume you have something like

def train_model(specs, big_array):
    return ...

that takes some spec specifics and data, and returns a model built for these specifics.

I also assume in the following you have some array specifics containing all the specifics you want to try (and also that it is divisible in the number of cpus, which is not that difficult to get rid of).

Finally, I assume the point is to build a list models of all the models.

Your code becomes:

from multiprocessing import Pool

n_cpus = 4
n_iterations = len(specifics) / n_cpus
models = []
p = multiprocessing.Pool(n_cpus)
for it in range(n_iterations):
    cur_specs = specifics[it * n_cpus: (it + 1) * n_cpu]
    cur_models = p.map(lambda specs: train_model(specs, big_array), cur_specs)
    models.extend(cur_models)
    # Cleanup here

Upvotes: 1

Related Questions