Reputation: 184
I have a program similar to the following:
import time
from multiprocessing import Pool
class a_system():
def __init__(self,N):
self.N = N
self.L = [0 for _ in range(self.N)]
def comp(self,n):
self.L[n] = 1
return self.L
def reset(self):
self.L = [0 for _ in range(self.N)]
def individual_sim(iter):
global B, L, sys
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
def simulate(N_mc):
global B, L, sys
L = [[] for _ in range(N_mc)]
B = 0
sys = a_system(N_mc)
[*map(individual_sim, range(N_mc))]
# with Pool() as P:
# P.map(individual_sim,range(N_mc))
return L, B
if __name__=="__main__":
start = time.time()
L, B = simulate(N_mc=5)
print(L)
print(B)
print("Time elapsed: ",time.time()-start)
Here I would like to parallelise the line [*map(individual_sim, range(N_mc))]
with multiprocessing. However, replacing this line with
with Pool() as P:
P.map(individual_sim,range(N_mc))
returns an empty list of lists.
If instead I use P.map_async
, P.imap
, or P.imap_unordered
, I don't get an error, but the list and B
are left empty.
How can I parallelise this code?
P.S.
I have tried ThreadPool
from multiprocessing.pool
, but I would like to avoid that, because the class a_system
, which is a bit more complicated that the one shown here, needs to have a different copy for each worker (I get an exit code 139 (interrupted by signal 11: SIGSEGV)
).
P.S.2 I might try to use sharedctypes or Managers (?), but I'm not sure how they work, nor which one should I use (or a combination?).
P.S.3
I have also tried modifying individual_sim
as
def individual_sim(iter,B,L,sys):
sys.reset()
L[iter] = sys.comp(iter)
B += sum(L[iter])
time.sleep(1)
return L, B
and to use the following in simulation
:
from functools import partial
part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
with Pool() as P:
P.map(part_individual_sim,range(N_mc))
But I still get empty lists.
Upvotes: 1
Views: 383
Reputation: 16213
the multiprocessing
module is works by fork
ing the master process (or executing more copies of the Python interpreter, especially under Windows).
therefore, you'll see global variables, but they won't be shared between processes --- unless you go to special measures like, e.g., explicitly sharing memory. you're better off passing the required state in as function parameters (or via the Pool
's initializer
and initargs
) and passing results back via the return value.
this tends to limit your design choices a bit, especially if you need to pass a lot of state around (e.g. as data you want to fit)
it's a very light weight wrapper around pretty low level primitives, hence it's not as featureful as things like Dask
but performance tends to be better, if you can live with the constraints
editing to include some demo code which assumes the N_mc
variable in your question relates to you doing some Monte Carlo/randomised approximation. I start by pulling in some libraries:
from multiprocessing import Pool
from PIL import Image
import numpy as np
and define a worker function and code for its initialisation:
def initfn(path):
# make sure worker processes don't share RNG state, see:
# https://github.com/numpy/numpy/issues/9650
np.random.seed()
global image
with Image.open(path) as img:
image = np.asarray(img.convert('L'))
def worker(i, nsamps):
height, width = image.shape
subset = image[
np.random.randint(height, size=nsamps),
np.random.randint(width, size=nsamps),
]
return np.mean(subset)
def mc_mean(path, nsamples, niter):
with Pool(initializer=initfn, initargs=(path,)) as pool:
params = [(i, nsamples) for i in range(niter)]
return pool.starmap(worker, params)
i.e. initfn
reads a JPEG/PNG file into a numpy array, then worker
just calculates the average value (i.e. brightness) for some random subset of pixels. Note that color images are loaded as 3d matrices, indexed by [row, col, channel]
(channels are conventionally 0=red, 1=blue, 2=green). Also, we also explicitly call np.random.seed
to make sure that our worker jobs don't get the same sequence of random values.
We can then run this and plot the output to make sure everything looks OK:
import scipy.stats as sps
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='ticks')
filename = 'an_image.jpeg'
result = mc_mean(filename, 1234, 56789)
# Histogram of our results
plt.hist(result, 201, density=True, alpha=0.5, edgecolor='none')
# also calculate/display expected distribution
with Image.open(filename) as img:
arr = np.asarray(img.convert('L'))
# approximate distribution of monte-carlo error
mcdist = sps.norm(np.mean(arr), np.std(arr) / np.sqrt(1234))
mn,mx = plt.xlim()
plt.xlim(mn, mx)
x = np.linspace(mn, mx, 201)
plt.plot(x, mcdist.pdf(x), '--', color='C1')
sns.despine()
which should give us something like:
obviously this will depend on the image used, this is from this JPEG.
Upvotes: 1
Reputation: 21694
It's not really clear to me what your business logic is here, but you cannot modify globals in your parent from within your child processes. Separate processes don't share their address space.
You could make L
a Manager.List
and B
a Manager.Value
to modify them from your worker processes, though. Manager-objects live in a separate server process and you can modify them with proxy objects. Further you would need to use a Manager.Lock
while modifying these shared objects to prevent data corruption.
Here is a stripped-down example which should get you started:
import time
from multiprocessing import Pool, Manager
def individual_sim(mlist, mvalue, mlock, idx):
# in your real computation, make sure to not hold the lock longer than
# really needed (e.g. calculations without holding lock)
with mlock:
mlist[idx] += 10
mvalue.value += sum(mlist)
def simulate(n_workers, n):
with Manager() as m:
mlist = m.list([i for i in range(n)])
print(mlist)
mvalue = m.Value('i', 0)
mlock = m.Lock()
iterable = [(mlist, mvalue, mlock, i) for i in range(n)]
with Pool(processes=n_workers) as pool:
pool.starmap(individual_sim, iterable)
# convert to non-shared objects before terminating manager
mlist = list(mlist)
mvalue = mvalue.value
return mlist, mvalue
if __name__=="__main__":
N_WORKERS = 4
N = 20
start = time.perf_counter()
L, B = simulate(N_WORKERS, N)
print(L)
print(B)
print("Time elapsed: ",time.perf_counter() - start)
Example Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed: 0.14064819699706277
Process finished with exit code 0
It would also be possible to use Pool's initializer
-parameter to pass proxies upon worker initialization and register them as globals instead of sending them as regular arguments with the starmap-call.
A bit more about Manager
usage (relevant: nested proxies) I've written up here.
Upvotes: 3