Reputation: 1465
I have a very processor-intensive task that takes a ballpark of 13-20 hours to complete, depending on the machine. Seemed like an obvious choice for parallelization via the multiprocessing library. Problem is... the more processes I spawn, the slower the same code gets.
Time per iteration (i.e. the time it takes to run sparse.linalg.cg):
183s 1 process
245s 2 processes
312s 3 processes
383s 4 processes
Granted, while 2 processes takes a little over 30% more time for each iteration, it's doing 2 at the same time, so it's still marginally faster. But I would not expect the actual math operations themselves to be slower! These timers don't start until after whatever overhead multiprocessing adds.
Here's a stripped down version of my code. The problem line is the sparse.linalg.cg one. (I've tried things like using MKL vs OpenBLAS, and forcing them to run in a single thread. Also tried manually spawning Processes instead of using a pool. No luck.)
def do_the_thing_partial(iteration: int, iter_size: float, outQ : multiprocessing.Queue, L: int, D: int, qP: int, elec_ind: np.ndarray, Ic: int, ubi2: int,
K : csc_matrix, t: np.ndarray, dip_ind_t: np.ndarray, conds: np.ndarray, hx: float, dstr: np.ndarray):
range_start = ceil(iteration * iter_size)
range_end = ceil((iteration + 1) * iter_size)
for rr in range(range_start, range_end):
# do some things (like generate F from rr)
Vfull=sparse.linalg.cg(K,F,tol=1e-11,maxiter=1200)[0] #Solve the system
# do more things
outQ.put((rr, Vfull))
def do_the_thing(L: int, D: int, qP: int, elec_ind: np.ndarray, Ic: int, ubi2: int,
K : csc_matrix, t: np.ndarray, dip_ind_t: np.ndarray, conds: np.ndarray, hx: float, dstr: np.ndarray):
num_cores = cpu_count()
iterations_per_process = (L-1) / num_cores # 257 / 8 ?
outQ = multiprocessing.Queue()
pool = multiprocessing.Pool(processes=num_cores)
[pool.apply_async(do_the_thing_partial,
args=(i, iterations_per_process, outQ, L, D, qP, elec_ind, Ic, ubi2, K, t, dip_ind_t, conds, hx, dstr),
callback=None)
for i in range(num_cores)]
pool.close()
pool.join()
for res in outQ:
# combine results and return here
Am I doing something wrong, or is it impossible to parallelize sparse.linalg.cg because of its own optimizations?
Thanks!
Upvotes: 5
Views: 3100
Reputation: 3372
Here's an example of how to get a speedup using Ray (a library for parallel and distributed Python). You can run the code below after doing pip install ray
(on Linux or MacOS).
Running the serial version of the computation below (e.g., doing scipy.sparse.linalg.cg(K, F, tol=1e-11, maxiter=100)
20 times) takes 33 seconds on my laptop. Timing the code below for launching the 20 tasks and getting the results takes 8.7 seconds. My laptop has 4 physical cores, so this is almost a 4x speedup.
I changed your code a lot, but I think I preserved the essence of it.
import numpy as np
import ray
import scipy.sparse
import scipy.sparse.linalg
# Consider passing in 'num_cpus=psutil.cpu_count(logical=True)'.
ray.init()
num_elements = 10**7
dim = 10**4
data = np.random.normal(size=num_elements)
row_indices = np.random.randint(0, dim, size=num_elements)
col_indices = np.random.randint(0, dim, size=num_elements)
K = scipy.sparse.csc_matrix((data, (row_indices, col_indices)))
@ray.remote
def solve_system(K, F):
# Solve the system.
return scipy.sparse.linalg.cg(K, F, tol=1e-11, maxiter=100)[0]
# Store the array in shared memory first. This is optional. That is, you could
# directly pass in K, however, this should speed it up because this way it only
# needs to serialize K once. On the other hand, if you use a different value of
# "K" for each call to "solve_system", then this doesn't help.
K_id = ray.put(K)
# Time the code below!
result_ids = []
for _ in range(20):
F = np.random.normal(size=dim)
result_ids.append(solve_system.remote(K_id, F))
# Run a bunch of tasks in parallel. Ray will schedule one per core.
results = ray.get(result_ids)
The call to ray.init()
starts the Ray worker processes. The call to solve_system.remote
submits the tasks to the workers. Ray will schedule one per core by default, though you can specify that a particular task requires more resources (or fewer resources) via @ray.remote(num_cpus=2)
. You can also specify GPU resources and other custom resources.
The call to solve_system.remote
immediately returns an ID representing the eventual output of the computation, and the call to ray.get
takes the IDs and retrieves the actual results of the computation (so ray.get
will wait until the tasks finish executing).
Some notes
scipy.sparse.linalg.cg
seems to limit itself to a single core, but if it doesn't, then you should consider pinning each worker to a specific core to avoid contention between worker processes (you can do this on Linux by doing psutil.Process().cpu_affinity([i])
where i
is the index of the core to bind to.ray timeline
from the command line and visualizing the result in chrome://tracing (in the Chrome web browser).K
matrix once per worker. This is an important performance optimization (though it doesn't matter if the tasks take a really long time). This helps primarily with objects that contain large numpy arrays. It doesn't help with arbitrary Python objects. This is enabled by using the Apache Arrow data layout. You can read more in this blog post.You can see more in the Ray documentation. Note that I'm one of the Ray developers.
Upvotes: 4