Reputation: 103
After browsing through many discussions on the same/similar topics, I still can't solve my problem, hence I would like to post it below.
The following is a MWE for what I would like to parallelize, which is to solve a set of independent linear equations (nI+mat)x=y
parametrized by n=0,1,2
with fixed arrays mat
and y
. Note that the arrays are declared to be global
with the hope that they can be accessed by different processes/pools (see below). But I don't think it works and this is the core of the question: How to share big numpy arrays for different processes/pools to avoid communication overhead?
import numpy as np
import time
import os
N = 2000
num = 3
global mat
mat = np.random.rand(N, N)
global y
y = np.random.rand(N,1)
# Functions to be parallelized num of times
def fun(n):
print(f"{n}-th job is run on child {os.getpid()} of parent {os.getppid()}")
newmat = n * np.eye(N) + mat
return np.linalg.solve(newmat, y)
# Approach 1: no parallel
def main():
start_time = time.time()
res = []
for i in range(num):
res.append(fun(i))
print(f"Approach 1: Time elapsed = {time.time()-start_time} sec")
return res
main()
I tried the following three approaches to parallelize it: Pool
, Process
and Process
with Array
and numpy.frombuffer
. See below.
from multiprocessing import Process, set_start_method, Queue, Pool, cpu_count, Array, RawArray
set_start_method('fork')
# Approach 2: with pool
def main2():
start_time = time.time()
pool = Pool(cpu_count())
res = pool.map(fun, range(num))
print(f"Approach 2: Time elapsed = {time.time()-start_time} sec")
pool.close()
return res
main2()
# Approach 3: with process
def fun2(i, output):
output.put(fun(i))
def main3():
start_time = time.time()
output = Queue()
processes = [Process(target=fun2, args=(i, output)) for i in range(num)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
res = [output.get() for _ in processes]
print(f"Approach 3: Time elapsed = {time.time()-start_time} sec")
return res
main3()
# Approach 4: with process with Array, numpy.frombuffer,
def fun3(n, output, mat, y):
print(f"{n}-th job is run on child {os.getpid()} of parent {os.getppid()}")
mat2 = np.frombuffer(mat.get_obj())
newmat = n * np.eye(N) + mat2.reshape((N, N))
output.put(np.linalg.solve(newmat, y))
def main4():
mat2 = Array('d', mat.flatten())
y2 = Array('d', y)
start_time = time.time()
output = Queue()
processes = [Process(target=fun3, args=(i, output, mat2, y2)) for i in range(num)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
res = [output.get() for _ in processes]
print(f"Approach 4: Time elapsed = {time.time()-start_time} sec")
return res
main4()
Neither of these approaches works and I got
0-th job is run on child 8818 of parent 3421
1-th job is run on child 8818 of parent 3421
2-th job is run on child 8818 of parent 3421
Approach 1: Time elapsed = 0.2891273498535156 sec
0-th job is run on child 8819 of parent 8818
1-th job is run on child 8820 of parent 8818
2-th job is run on child 8821 of parent 8818
Approach 2: Time elapsed = 3.6278929710388184 sec
0-th job is run on child 8832 of parent 8818
1-th job is run on child 8833 of parent 8818
2-th job is run on child 8834 of parent 8818
Approach 3: Time elapsed = 4.243804931640625 sec
0-th job is run on child 8843 of parent 8818
1-th job is run on child 8844 of parent 8818
2-th job is run on child 8845 of parent 8818
Approach 4: Time elapsed = 4.745251893997192 sec
This summarizes all the approaches I have seen so far. I am aware of that there is a SharedMemory
in Multiprocessing, which it is not available to python 3.7.2. If that could solve the problem, I would be very happy to see how it works.
Really thanks for anyone to read through the whole post, and any helps are appreciated. And in case it is important, I am using a Mac with Apple M1 chip, macOS Monterey.
Update 1: per @AKX's point, I removed the print(n-th job)
line, and make N=10000
, and the results are
Approach 1: Time elapsed = 23.812573194503784 sec
Approach 2: Time elapsed = 126.91087889671326 sec
for Approach 3, it has taken for around 5 minutes which I have to cut it off. So the time overhead is pretty large for large N.
Upvotes: 0
Views: 828
Reputation: 50931
np.linalg.solve
should already be executed in parallel function implemented in LAPACK. In fact, this is the case on my (Linux + Windows) machine. Indeed, it calls LAPACK functions like dtrsm
and dlaswp
and the main computational function, dgemm
, implemented in BLAS libraries. This last function should take >90% of the time and is heavily optimized and parallelized as long as you use a fast BLAS implementation. Numpy use OpenBLAS by default on most systems which is very good (and parallel). The Intel MKL is also a good alternative supporting LAPACK (certainly better on Intel hardware). If the computation is not parallel on your machine, this is a problem and you should check your BLAS implementation as it may be very inefficient.
The thing is parallelizing a code already parallel make it slower because running more threads than available core put a lot of pressure on the OS scheduler and the BLAS functions are not optimized for such a case. More specifically, profiling results shows parallelizing a parallel OpenBLAS function cause some synchronization function to wait for a while because of the work imbalance (certainly due to a static schedule of the computing kernels). This is the main source of slowdown of the approach 2/3/4 compared to the first sequential approach.
If you really want to parallelize the function you need to configure the BLAS so to use 1 thread (with OMP_NUM_THREADS=1
on OpenBLAS) but this is likely be less efficient than letting the BLAS does the parallelization. Indeed, BLAS makes use of optimized multi-threaded kernels (working in shared memory) meanwhile Python nearly prevent such design to be fast because of the global interpreter lock (GIL) in multi-threaded codes. Multi-threading is also limited by the overhead of pickling or the one of forking. That being said such overheads are small in the approach 2 and 3 (not in 4 certainly due to the queue). This generally is why Python is often not great for writing fast parallel applications (unless for long-lasting embarrassingly parallel works with small data transfers).
Here are average timing on my 6-core i5-9600KF machine with OpenBLAS:
Default OpenBLAS configuration:
- Approach 1: 0.129 seconds (stable)
- Approach 2: 1.623 seconds (highly-variable)
- Approach 3: 1.407 seconds (highly-variable)
- Approach 4: 1.532 seconds (highly-variable)
Sequential configuration of OpenBLAS:
- Approach 1: 0.338 seconds (stable) <--- Now sequential
- Approach 2: 0.152 seconds (quite stable)
- Approach 3: 0.151 seconds (stable)
- Approach 4: 0.177 seconds (stable) <--- >10% time in CPython overheads
- Parallel Numba: 0.144 seconds (stable) <--- Multi-threaded
Note that the best speed up is pretty bad (ie. ~2.6) on a 6-core machine. I guess this might be because the computation is memory-bound (on my PC, the RAM throughput is about 20 GiB/s, while it can reach 32-35 GiB/s at most for such access pattern).
Upvotes: 3
Reputation: 43553
To answer this question, we need to talk about how multiprocessing
works.
On UNIX-like platforms, multiprocessing
uses the fork
system call. This creates an almost perfect copy of the parent process, copying mat
and y
for you. In this case, sharing them doesn't make much sense because modern UNIX-like operating systems tend to use copy-on-write for memory pages where possible.
On platforms like macOS and ms-windows, it starts a new Python instance and imports your code into it by default.
Although you can use fork
on macOS.
So there it wil re-create mat
and y
in every instance.
If you want to share data in this case, multiprocessing
has several mechanisms for that, like Array
and Manager
. The latter should be able to share numpy arrays with some glue.
And you have to keep in mind that there is some overhead associated with using them; their use case is geared toward modification of shared data, so it has mechanisms to deal with that. Which you don't need in your case.
Since you are using fork
, I don't think that sharing the data will be much faster. And if the data is larger than a page of memory the operating system should use copy-on-write sharing, so it would not save you memory either.
As an alternative, you could write mat
and y
to a file in the parent process, and read them in the worker processes.
Or you could use a read-only mmap
. But in that case you would still have to convert it to a numpy array.
Upvotes: 0