semola
semola

Reputation: 184

Multiprocessing: How to mp.map a function storing elements in a list?

I have a program similar to the following:

import time
from multiprocessing import Pool

class a_system():
    def __init__(self,N):
        self.N = N
        self.L = [0 for _ in range(self.N)]
    def comp(self,n):
        self.L[n] = 1
        return self.L
    def reset(self):
        self.L = [0 for _ in range(self.N)]

def individual_sim(iter):
    global B, L, sys
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

def simulate(N_mc):
    global B, L, sys
    L = [[] for _ in range(N_mc)]
    B = 0
    sys = a_system(N_mc)
    [*map(individual_sim, range(N_mc))]
    # with Pool() as P:
    #     P.map(individual_sim,range(N_mc))
    return L, B

if __name__=="__main__":
    start = time.time()
    L, B = simulate(N_mc=5)
    print(L)
    print(B)
    print("Time elapsed: ",time.time()-start)

Here I would like to parallelise the line [*map(individual_sim, range(N_mc))] with multiprocessing. However, replacing this line with

with Pool() as P:
     P.map(individual_sim,range(N_mc))

returns an empty list of lists.

If instead I use P.map_async, P.imap, or P.imap_unordered, I don't get an error, but the list and B are left empty.

How can I parallelise this code?

P.S. I have tried ThreadPool from multiprocessing.pool, but I would like to avoid that, because the class a_system, which is a bit more complicated that the one shown here, needs to have a different copy for each worker (I get an exit code 139 (interrupted by signal 11: SIGSEGV)).

P.S.2 I might try to use sharedctypes or Managers (?), but I'm not sure how they work, nor which one should I use (or a combination?).

P.S.3 I have also tried modifying individual_sim as

def individual_sim(iter,B,L,sys):
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

and to use the following in simulation:

   from functools import partial
   part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
   with Pool() as P:
        P.map(part_individual_sim,range(N_mc))

But I still get empty lists.

Upvotes: 1

Views: 383

Answers (2)

Sam Mason
Sam Mason

Reputation: 16213

the multiprocessing module is works by forking the master process (or executing more copies of the Python interpreter, especially under Windows).

therefore, you'll see global variables, but they won't be shared between processes --- unless you go to special measures like, e.g., explicitly sharing memory. you're better off passing the required state in as function parameters (or via the Pool's initializer and initargs) and passing results back via the return value.

this tends to limit your design choices a bit, especially if you need to pass a lot of state around (e.g. as data you want to fit)

it's a very light weight wrapper around pretty low level primitives, hence it's not as featureful as things like Dask but performance tends to be better, if you can live with the constraints

editing to include some demo code which assumes the N_mc variable in your question relates to you doing some Monte Carlo/randomised approximation. I start by pulling in some libraries:

from multiprocessing import Pool

from PIL import Image
import numpy as np

and define a worker function and code for its initialisation:

def initfn(path):
    # make sure worker processes don't share RNG state, see:
    #   https://github.com/numpy/numpy/issues/9650
    np.random.seed()

    global image
    with Image.open(path) as img:
        image = np.asarray(img.convert('L'))

def worker(i, nsamps):
    height, width = image.shape
    subset = image[
        np.random.randint(height, size=nsamps),
        np.random.randint(width, size=nsamps),
    ]
    return np.mean(subset)

def mc_mean(path, nsamples, niter):
    with Pool(initializer=initfn, initargs=(path,)) as pool:
        params = [(i, nsamples) for i in range(niter)]
        return pool.starmap(worker, params)

i.e. initfn reads a JPEG/PNG file into a numpy array, then worker just calculates the average value (i.e. brightness) for some random subset of pixels. Note that color images are loaded as 3d matrices, indexed by [row, col, channel] (channels are conventionally 0=red, 1=blue, 2=green). Also, we also explicitly call np.random.seed to make sure that our worker jobs don't get the same sequence of random values.

We can then run this and plot the output to make sure everything looks OK:

import scipy.stats as sps
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='ticks')

filename = 'an_image.jpeg'
result = mc_mean(filename, 1234, 56789)

# Histogram of our results
plt.hist(result, 201, density=True, alpha=0.5, edgecolor='none')

# also calculate/display expected distribution
with Image.open(filename) as img:
    arr = np.asarray(img.convert('L'))
    # approximate distribution of monte-carlo error 
    mcdist = sps.norm(np.mean(arr), np.std(arr) / np.sqrt(1234))

mn,mx = plt.xlim()
plt.xlim(mn, mx)

x = np.linspace(mn, mx, 201)
plt.plot(x, mcdist.pdf(x), '--', color='C1')
sns.despine()

which should give us something like:

MC distribution

obviously this will depend on the image used, this is from this JPEG.

Upvotes: 1

Darkonaut
Darkonaut

Reputation: 21694

It's not really clear to me what your business logic is here, but you cannot modify globals in your parent from within your child processes. Separate processes don't share their address space.

You could make L a Manager.List and B a Manager.Value to modify them from your worker processes, though. Manager-objects live in a separate server process and you can modify them with proxy objects. Further you would need to use a Manager.Lock while modifying these shared objects to prevent data corruption.

Here is a stripped-down example which should get you started:

import time
from multiprocessing import Pool, Manager


def individual_sim(mlist, mvalue, mlock, idx):
    # in your real computation, make sure to not hold the lock longer than
    # really needed (e.g. calculations without holding lock)
    with mlock:
        mlist[idx] += 10
        mvalue.value += sum(mlist)


def simulate(n_workers, n):

    with Manager() as m:
        mlist = m.list([i for i in range(n)])
        print(mlist)
        mvalue = m.Value('i', 0)
        mlock = m.Lock()

        iterable = [(mlist, mvalue, mlock, i) for i in range(n)]

        with Pool(processes=n_workers) as pool:
             pool.starmap(individual_sim, iterable)

        # convert to non-shared objects before terminating manager
        mlist = list(mlist)
        mvalue = mvalue.value

    return mlist, mvalue


if __name__=="__main__":

    N_WORKERS = 4
    N = 20

    start = time.perf_counter()
    L, B = simulate(N_WORKERS, N)
    print(L)
    print(B)
    print("Time elapsed: ",time.perf_counter() - start)

Example Output:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed:  0.14064819699706277

Process finished with exit code 0

It would also be possible to use Pool's initializer-parameter to pass proxies upon worker initialization and register them as globals instead of sending them as regular arguments with the starmap-call.

A bit more about Manager usage (relevant: nested proxies) I've written up here.

Upvotes: 3

Related Questions