Reputation: 33213
I have an embarrassingly parallel problem but have been wondering how to "design" function so that it achieves the end result
So, here is the sequential version
def train_weights(Xtr, ztr, Xte, zte):
regr = some_model()
regr.fit(Xtr, ztr)
error = np.mean((regr.predict(Xte) - zte) ** 2)
return regr.coef_, error
rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
z_dim_tr = z_train[:,i]
z_dim_te = z_test[:, i]
weight, error = train_weights(X_train, z_dim_tr, X_test, z_dim_te)
weights.append(wgts)
errors.append(error)
So, I am just slicing a column from a matrix (train and test matrices) and then passing it to the function.. Note that, order of output matters.. which is the index of weight in weights list corresponds to a particular "i" and same for error.
How do i parallelize this?
Upvotes: 0
Views: 784
Reputation: 3354
Here's one way to parallelize the code using Ray. Some advantages of using Ray
Ray is a library for writing parallel and distributed Python.
import numpy as np
import ray
ray.init()
z_train = np.random.normal(size=(100, 30))
z_test = np.random.normal(size=(50, 30))
@ray.remote(num_return_vals=2)
def train_weights(ztr, zte):
# Fit model.
predictions = np.random.normal(size=zte.shape[0])
error = np.mean((predictions - zte) ** 2)
coef = np.random.normal()
return coef, error
weight_ids = []
error_ids = []
for i in range(z_train.shape[1]):
z_dim_tr = z_train[:, i]
z_dim_te = z_test[:, i]
weight_id, error_id = train_weights.remote(z_dim_tr, z_dim_te)
weight_ids.append(weight_id)
error_ids.append(error_id)
weights = ray.get(weight_ids)
errors = ray.get(error_ids)
You can read more in the Ray documentation. Note I'm one of the Ray developers.
Upvotes: 1
Reputation: 26157
Since this is just a general parallel processing problem, you can use Pool
from multiprocessing.dummy
.
Since I don't have your data set, let's instead consider this following example.
import multiprocessing
from multiprocessing.dummy import Pool
def test(args):
a, b = args
return a
data = [
(1, 2),
(2, 3),
(3, 4),
]
pool = Pool(multiprocessing.cpu_count())
results = pool.map(test, data)
pool.close()
pool.join()
for result in results:
print(result)
Pool creates an amount of worker processes (in this case multiprocessing.cpu_count()
). Each worker then continuously execute a job until all jobs have been executed. In other words map()
first returns when all jobs have been executed.
All in all, the above example, when calling map()
it returns a list of results which are in the same order as they're given. So in the end the above code prints 1
, 2
then 3
.
Upvotes: 2
Reputation: 1162
it can easily achieved using concurrents.futures library
here's the example code:
from concurrent.futures.thread import ThreadPoolExecutor
MAX_WORKERS = 20
def train_weights(Xtr, ztr, Xte, zte):
regr = some_model()
regr.fit(Xtr, ztr)
error = np.mean((regr.predict(Xte) - zte) ** 2)
return regr.coef_, error
def work_done(future):
weights.append(future.result())
rnge = range(z_train.shape[0])
weights = []
errors = []
for i in rnge:
z_dim_tr = z_train[:, i]
z_dim_te = z_test[:, i]
with ThreadPoolExecutor(MAX_WORKERS) as executor:
executor.submit(train_weights, X_train, X_test, Xte, z_dim_te).add_done_callback(work_done)
here executor returns future for every task it submits. keep in mind that if you use add_done_callback()
finished task from thread returns to the main thread (which would block your main thread) if you really want true parallelism then you should wait for future objects separately. here's the code snippet for that.
futures = []
for i in rnge:
z_dim_tr = z_train[:, i]
z_dim_te = z_test[:, i]
with ThreadPoolExecutor(MAX_WORKERS) as executor:
futures.append(executor.submit(train_weights, X_train, X_test, Xte, z_dim_te))
wait(futures)
for succeded, failed in futures:
# work with your result here
if succeded:
weights.append(succeded.result())
if failed:
errors.append(failed.result())
Upvotes: 1
Reputation: 3277
check out joblib
https://pythonhosted.org/joblib/parallel.html
Joblib provides a simple helper class to write parallel for loops using multiprocessing. The core idea is to write the code to be executed as a generator expression, and convert it to parallel computing:
>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
can be spread over 2 CPUs using the following:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Upvotes: 1