user3534080
user3534080

Reputation: 1465

Python multiprocessing slow with numpy/scipy

I have a very processor-intensive task that takes a ballpark of 13-20 hours to complete, depending on the machine. Seemed like an obvious choice for parallelization via the multiprocessing library. Problem is... the more processes I spawn, the slower the same code gets.

Time per iteration (i.e. the time it takes to run sparse.linalg.cg):

183s 1 process

245s 2 processes

312s 3 processes

383s 4 processes

Granted, while 2 processes takes a little over 30% more time for each iteration, it's doing 2 at the same time, so it's still marginally faster. But I would not expect the actual math operations themselves to be slower! These timers don't start until after whatever overhead multiprocessing adds.

Here's a stripped down version of my code. The problem line is the sparse.linalg.cg one. (I've tried things like using MKL vs OpenBLAS, and forcing them to run in a single thread. Also tried manually spawning Processes instead of using a pool. No luck.)

def do_the_thing_partial(iteration: int, iter_size: float, outQ : multiprocessing.Queue, L: int, D: int, qP: int, elec_ind: np.ndarray, Ic: int, ubi2: int,
                 K : csc_matrix, t: np.ndarray, dip_ind_t: np.ndarray, conds: np.ndarray, hx: float, dstr: np.ndarray):
    range_start = ceil(iteration * iter_size)
    range_end = ceil((iteration + 1) * iter_size)

    for rr in range(range_start, range_end):
        # do some things (like generate F from rr)
        Vfull=sparse.linalg.cg(K,F,tol=1e-11,maxiter=1200)[0] #Solve the system
        # do more things
        outQ.put((rr, Vfull))


def do_the_thing(L: int, D: int, qP: int, elec_ind: np.ndarray, Ic: int, ubi2: int,
                 K : csc_matrix, t: np.ndarray, dip_ind_t: np.ndarray, conds: np.ndarray, hx: float, dstr: np.ndarray):
    num_cores = cpu_count()
    iterations_per_process = (L-1) / num_cores  # 257 / 8 ?

    outQ = multiprocessing.Queue()

    pool = multiprocessing.Pool(processes=num_cores)

    [pool.apply_async(do_the_thing_partial,
                      args=(i, iterations_per_process, outQ, L, D, qP, elec_ind, Ic, ubi2, K, t, dip_ind_t, conds, hx, dstr),
                      callback=None)
     for i in range(num_cores)]

    pool.close()
    pool.join()

    for res in outQ:
        # combine results and return here

Am I doing something wrong, or is it impossible to parallelize sparse.linalg.cg because of its own optimizations?

Thanks!

Upvotes: 5

Views: 3100

Answers (1)

Robert Nishihara
Robert Nishihara

Reputation: 3372

Here's an example of how to get a speedup using Ray (a library for parallel and distributed Python). You can run the code below after doing pip install ray (on Linux or MacOS).

Running the serial version of the computation below (e.g., doing scipy.sparse.linalg.cg(K, F, tol=1e-11, maxiter=100) 20 times) takes 33 seconds on my laptop. Timing the code below for launching the 20 tasks and getting the results takes 8.7 seconds. My laptop has 4 physical cores, so this is almost a 4x speedup.

I changed your code a lot, but I think I preserved the essence of it.

import numpy as np
import ray
import scipy.sparse
import scipy.sparse.linalg

# Consider passing in 'num_cpus=psutil.cpu_count(logical=True)'.
ray.init()

num_elements = 10**7
dim = 10**4

data = np.random.normal(size=num_elements)
row_indices = np.random.randint(0, dim, size=num_elements)
col_indices = np.random.randint(0, dim, size=num_elements)

K = scipy.sparse.csc_matrix((data, (row_indices, col_indices)))

@ray.remote
def solve_system(K, F):
    # Solve the system.
    return scipy.sparse.linalg.cg(K, F, tol=1e-11, maxiter=100)[0]

# Store the array in shared memory first. This is optional. That is, you could
# directly pass in K, however, this should speed it up because this way it only
# needs to serialize K once. On the other hand, if you use a different value of
# "K" for each call to "solve_system", then this doesn't help.
K_id = ray.put(K)

# Time the code below!

result_ids = []
for _ in range(20):
    F = np.random.normal(size=dim)
    result_ids.append(solve_system.remote(K_id, F))

# Run a bunch of tasks in parallel. Ray will schedule one per core.
results = ray.get(result_ids)

The call to ray.init() starts the Ray worker processes. The call to solve_system.remote submits the tasks to the workers. Ray will schedule one per core by default, though you can specify that a particular task requires more resources (or fewer resources) via @ray.remote(num_cpus=2). You can also specify GPU resources and other custom resources.

The call to solve_system.remote immediately returns an ID representing the eventual output of the computation, and the call to ray.get takes the IDs and retrieves the actual results of the computation (so ray.get will wait until the tasks finish executing).

Some notes

  • On my laptop, scipy.sparse.linalg.cg seems to limit itself to a single core, but if it doesn't, then you should consider pinning each worker to a specific core to avoid contention between worker processes (you can do this on Linux by doing psutil.Process().cpu_affinity([i]) where i is the index of the core to bind to.
  • If the tasks all take variable amounts of time, make sure that you aren't just waiting for one really slow task. You can check this by running ray timeline from the command line and visualizing the result in chrome://tracing (in the Chrome web browser).
  • Ray uses a shared memory object store to avoid having to serialize and deserialize the K matrix once per worker. This is an important performance optimization (though it doesn't matter if the tasks take a really long time). This helps primarily with objects that contain large numpy arrays. It doesn't help with arbitrary Python objects. This is enabled by using the Apache Arrow data layout. You can read more in this blog post.

You can see more in the Ray documentation. Note that I'm one of the Ray developers.

Upvotes: 4

Related Questions