cpicke1
cpicke1

Reputation: 29

multiprocessing with large numpy array as argument

I am wishing to use multiprocessing where one of the arguments is a very large numpy array. I’ve researched some other posts appearing to have similar issues

Large numpy arrays in shared memory for multiprocessing: Is sth wrong with this approach?

Share Large, Read-Only Numpy Array Between Multiprocessing Processes

but being rather new to python, I've been having trouble adapting the solutions to this template and in this form. I wonder if I could ask for your help to understand what my options are in order to convey X to the functions in a read-only manner. My simplified snippet of code is here:

import multiprocessing as mp
import numpy as np

def funcA(X):
    # do something with X
    print 'funcA OK'

def funcB(X):
    # do something else with X
    print 'funcB OK'

if __name__=='__main__':
    X=np.random.rand(int(5.00e8),)
    funcA(X) # OK
    funcB(X) # OK
    X=np.random.rand(int(2.65e8),)
    P=[]
    P.append(mp.Process(target=funcA,args=(X,))) # OK
    P.append(mp.Process(target=funcB,args=(X,))) # OK
    for p in P:
        p.start()

    for p in P:
        p.join()

    X=np.random.rand(int(2.70e8),)
    P=[]
    P.append(mp.Process(target=funcA,args=(X,))) # FAIL
    P.append(mp.Process(target=funcB,args=(X,))) # FAIL
    for p in P:
        p.start()

    for p in P:
        p.join()

funcA and funcB appear to accept very large numpy arrays when invoked sequentially. However, if they are invoked as multiprocesses, then there appears to be an upper size limit to the size of the numpy array that can be passed to the function. How could I best get around this?

Note:

0) I do not wish to modify X; only to read from it;

1) I’m running on 64-bit Windows 7 Professional

Upvotes: 2

Views: 3463

Answers (1)

Roberto Trani
Roberto Trani

Reputation: 1227

The problem can be in the data transfer to the child processes. When read-only objects must be used I prefer to exploit the copy-on-write mechanism used by the underlying OS to manage the memory of the child processes. However, I don't know if windows 7 uses this mechanism. When copy-on-write is available, you can access area of the parent process without copying them inside the child process. This trick works only if you access them in a read-only way and if the object is created before the creation of the processes.

Summing up, a possible solution (at least for linux machines) is this:

import multiprocessing as mp
import numpy as np

def funcA():
    print "A {}".format(X.shape)
    # do something with the global variable X
    print 'funcA OK'

def funcB():
    print "B {}".format(X.shape)
    # do something else with the global variable X
    print 'funcB OK'

if __name__=='__main__':
    X=np.random.rand(int(5.00e8),)
    funcA() # OK
    funcB() # OK

    X=np.random.rand(int(2.65e8),)
    P=[mp.Process(target=funcA), mp.Process(target=funcB)]
    for p in P:
        p.start()

    for p in P:
        p.join()

    X=np.random.rand(int(2.70e8),)
    P=[mp.Process(target=funcA), mp.Process(target=funcB)]
    for p in P:
        p.start()

    for p in P:
        p.join()

UPDATE: after various comments about compatibility problems with Windows, I sketched a new solution uniquely based on native memory maps. In this solution I am creating a numpy memory map on file, which is shared through the file descriptor, thus it doens't require to copy the whole array inside the childs. I found this solution much faster than using multiprocessing.Array!

UPDATE2: The code below has been updated to avoid memory issues during the randomisation of the memory map.

import multiprocessing as mp
import numpy as np
import tempfile

def funcA(X):
    print "A {}".format(X.shape)
    # do something with X
    print 'funcA OK'

def funcB(X):
    print "B {}".format(X.shape)
    # do something else with X
    print 'funcB OK'

if __name__=='__main__':
    dim = int(2.75e8)
    with tempfile.NamedTemporaryFile(dir='/tmp', delete=False) as tmpfile:
        X = np.memmap(tmpfile, shape=dim, dtype=np.float32)  # create the memory map
        # init the map by chunks of size 1e8
        max_chunk_size = int(1e8)
        for start_pos in range(0, dim, max_chunk_size):
            chunk_size = min(dim-start_pos, max_chunk_size)
            X[start_pos:start_pos+chunk_size] = np.random.rand(chunk_size,)
        P=[mp.Process(target=funcA, args=(X,)), mp.Process(target=funcB, args=(X,))]
        for p in P:
            p.start()

        for p in P:
            p.join()

Upvotes: 1

Related Questions