frazman
frazman

Reputation: 33213

how to parallelize a function in python

I have an embarrassingly parallel problem but have been wondering how to "design" function so that it achieves the end result

So, here is the sequential version

def train_weights(Xtr, ztr, Xte, zte):
    regr = some_model()
    regr.fit(Xtr, ztr)
    error = np.mean((regr.predict(Xte) - zte) ** 2)
    return regr.coef_, error

rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
    z_dim_tr = z_train[:,i]
    z_dim_te = z_test[:, i]
    weight, error = train_weights(X_train, z_dim_tr, X_test, z_dim_te)
    weights.append(wgts)
    errors.append(error)

So, I am just slicing a column from a matrix (train and test matrices) and then passing it to the function.. Note that, order of output matters.. which is the index of weight in weights list corresponds to a particular "i" and same for error.

How do i parallelize this?

Upvotes: 0

Views: 784

Answers (4)

Robert Nishihara
Robert Nishihara

Reputation: 3354

Here's one way to parallelize the code using Ray. Some advantages of using Ray

  • Large data will be stored in shared memory and can be accessed by multiple workers (in a read only fashion) so that workers don't need to create their own copy of the data.
  • The same code will run on one machine or on multiple machines.

Ray is a library for writing parallel and distributed Python.

import numpy as np
import ray

ray.init()

z_train = np.random.normal(size=(100, 30))
z_test = np.random.normal(size=(50, 30))


@ray.remote(num_return_vals=2)
def train_weights(ztr, zte):
    # Fit model.
    predictions = np.random.normal(size=zte.shape[0])
    error = np.mean((predictions - zte) ** 2)
    coef = np.random.normal()
    return coef, error


weight_ids = []
error_ids = []
for i in range(z_train.shape[1]):
    z_dim_tr = z_train[:, i]
    z_dim_te = z_test[:, i]
    weight_id, error_id = train_weights.remote(z_dim_tr, z_dim_te)
    weight_ids.append(weight_id)
    error_ids.append(error_id)

weights = ray.get(weight_ids)
errors = ray.get(error_ids)

You can read more in the Ray documentation. Note I'm one of the Ray developers.

Upvotes: 1

vallentin
vallentin

Reputation: 26157

Since this is just a general parallel processing problem, you can use Pool from multiprocessing.dummy.

Since I don't have your data set, let's instead consider this following example.

import multiprocessing
from multiprocessing.dummy import Pool

def test(args):
    a, b = args
    return a

data = [
    (1, 2),
    (2, 3),
    (3, 4),
]

pool = Pool(multiprocessing.cpu_count())

results = pool.map(test, data)

pool.close()
pool.join()

for result in results:
    print(result)

Pool creates an amount of worker processes (in this case multiprocessing.cpu_count()). Each worker then continuously execute a job until all jobs have been executed. In other words map() first returns when all jobs have been executed.

All in all, the above example, when calling map() it returns a list of results which are in the same order as they're given. So in the end the above code prints 1, 2 then 3.

Upvotes: 2

Asav Patel
Asav Patel

Reputation: 1162

it can easily achieved using concurrents.futures library

here's the example code:

from concurrent.futures.thread import ThreadPoolExecutor

MAX_WORKERS = 20

def train_weights(Xtr, ztr, Xte, zte):
    regr = some_model()
    regr.fit(Xtr, ztr)
    error = np.mean((regr.predict(Xte) - zte) ** 2)
    return regr.coef_, error

def work_done(future):
    weights.append(future.result())

rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
    z_dim_tr = z_train[:, i]
    z_dim_te = z_test[:, i]
    with ThreadPoolExecutor(MAX_WORKERS) as executor:
        executor.submit(train_weights, X_train, X_test, Xte, z_dim_te).add_done_callback(work_done)

here executor returns future for every task it submits. keep in mind that if you use add_done_callback() finished task from thread returns to the main thread (which would block your main thread) if you really want true parallelism then you should wait for future objects separately. here's the code snippet for that.

futures = []
for i in rnge:
    z_dim_tr = z_train[:, i]
    z_dim_te = z_test[:, i]
    with ThreadPoolExecutor(MAX_WORKERS) as executor:
        futures.append(executor.submit(train_weights, X_train, X_test, Xte, z_dim_te))

wait(futures)

for succeded, failed in futures:
    # work with your result here
    if succeded:
        weights.append(succeded.result())
    if failed:
        errors.append(failed.result())

Upvotes: 1

litepresence
litepresence

Reputation: 3277

check out joblib

https://pythonhosted.org/joblib/parallel.html

Joblib provides a simple helper class to write parallel for loops using multiprocessing. The core idea is to write the code to be executed as a generator expression, and convert it to parallel computing:

>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

can be spread over 2 CPUs using the following:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Upvotes: 1

Related Questions