cowbert
cowbert

Reputation: 3462

how to efficiently have multiproccessing Process read immutable big data

I would like to use multiprocessing to use multiple cores to run a procedure that does a pairwise comparison of elements in a large list:

data = [...] #when loaded this is > 100MB
for i in xrange(len(data)-1):
    parent = data[i]
    for j in xrange(i,len(data)):
        child = data[j]
        #do something with parent and child

so if I set up a process queue:

def worker(queue):
    while True:
        args = queue.get()
        if args == 'EOF':
            break
        f(*args)

def f(data, x, start):
    for i in xrange(start,len(data)):
       #do stuff

if __name__ == '__main__':
    from multiprocessing import Process, Queue, cpu_count
    import psycopg2

    cur = psycopg2.connect(...).cursor()
    data = cur.execute('SELECT * from table') 
    #when loaded into memory data is > 100MB

    other_f_arg = 'some object'

    queue = Queue()
    #spawn 1 child per core:
    workers = [Process(target=worker, args=((queue,)) for cpu in xrange(cpu_count())]
    for w in workers:
        w.start()

    for i in xrange(len(data)-1):
        queue.put((data, other_f_arg, i))

    queue.put('EOF')
    for w in workers:
        w.join()

When this runs, queue.put pushes data into the queue on every iteration, even though data should only need to be read once and then just re-referenced per process. Thus, all the advantages of multiproc are negated by the repeated data passing. How do I get each process to just pick up a copy of data and other_f_arg once and then only just pass the dynamic variable i as workers are freed?

UPDATE 1:

I decided to use Pool as per Tim Peters's suggestion below, but instead of using map, I am using apply_async with a callback (since I want the parent process to do some postprocessing of the return of f in a serial fashion instead of waiting for all submissions to finish (since f will return something large in memory too):

def worker_init(xdata):
    global data
    data = xdata

def callback(result, x):
    #do something with result of f(i), and x

def f(i):
    #do something with data[i]
    return result

if __name__ == '__main__':
    ...
    data = psycopg2_cursor.fetchall()

    NUM_CPU = None
    from multiprocessing import Pool
    from functools import partial


    pool = Pool(processes=NUM_CPU,
            initializer=worker_init,
            initargs=(data,))

    x = 'some extra param I want to pass to callback'
    shim_callback = partial(callback, x=x)

    for i in xrange(len(data)-1):
        pool.apply_async(f,
                         args=(i,),
                         callback=shim_callback)

    pool.close()
    pool.join()

Is there any way to redirect uncaught exceptions in the children to console? (like an exception raised in a single threaded process?) I ask because uncaught exceptions in f seem to just break the loop that calls apply_async, and I get no errors out to console or anything.

Upvotes: 1

Views: 1009

Answers (2)

Tim Peters
Tim Peters

Reputation: 70705

Easiest: on a Linux-y system (an OS that supports fork()), define data at module level. Then all worker processes will magically see (a copy of) data, due to magical fork() semantics.

More portable: use a multiprocessing.Pool() instead. When you create the Pool, you can specify an initialization function to run, and arguments to pass to that function. Then you could pass data just once per process, to some function that, e.g., binds it to a module global name. Other functions can then just refer to that module global. Pool() also supports several methods for passing out work (and retrieving results) that don't require you to manage queues explicitly. Don't know enough details here to suggest whether that would be better or worse for your specific problem.

Fleshing out the "portable" way

Here's one way to do it:

NUM_CPU = None  # defaults to all available CPUs

def worker_init(xdata, xother_f_arg):
    global data, other_f_arg
    data = xdata
    other_f_arg = xother_f_arg

def f(start):
    for i in xrange(start, len(data)):
       #do stuff

if __name__ == '__main__':
    from multiprocessing import Pool
    import psycopg2

    cur = psycopg2.connect(...).cursor()
    data = cur.execute('SELECT * from table') 
    other_f_arg = 'some object'

    pool = Pool(processes=NUM_CPU,
                initializer=worker_init,
                initargs=(data, other_f_arg))
    pool.map(f, xrange(len(data) - 1))
    pool.close()
    pool.join()

Note that it's substantially less code than slinging your own queues too.

While I can't run your code to be sure, I expect you'd be better off not passing the gigantic data using multiprocessing machinery, instead having each worker load its own copy from the database. Along the lines of:

def worker_init(xother_f_arg):
    import psycopg2
    global data, other_f_arg
    other_f_arg = xother_f_arg
    cur = psycopg2.connect(...).cursor()
    data = cur.execute('SELECT * from table') 

EDIT - dealing with errors

It's hard for parallel gimmicks to raise exceptions in child processes (or threads), because they occur in contexts that - in general - have nothing to do with what the main program happens to be doing at the time. The easiest way to deal with this is to keep references to the AsyncResult objects you're creating and explicitly .get() results from them (lose the callback! that's just useless complication here). Replace your:

for i in xrange(len(data)-1):
    pool.apply_async(f,
                     args=(i,),
                     callback=shim_callback)

with, e.g.,

# queue up all the work
futures = [pool.apply_async(f, args=(i,))
           for i in xrange(len(data) - 1)]
# retrieve results
for fut in futures:
    try:
        result = fut.get()
    except NameExceptionsYouWantToCatchHere as e:
        # do whatever you want with the exception
    else:
        # process result

From the docs (current Python 2):

get([timeout])

Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().

In Python 3, there's also a map_async() method, and an optional error_callback argument on many of the Pool() methods.

Note: if len(data) i s very large, the multiprocessing machinery can consume a correspondingly large amount of RAM to queue up all the work items - apply_async() never blocks, and the loop queues up work items as fast as possible. In that case, another layer of buffering may be needed.

Upvotes: 2

user2841651
user2841651

Reputation:

The problem is that passing your 'data' to your worker (= a process) will make the data copied. As it's quite a big dataset you won't (even if you can check to confirm) have any speed improvement.

Depending on the kind of data you have, you should check for multiprocessing array http://docs.python.org/2/library/multiprocessing.html#multiprocessing.Array . It is probably safer than 'global'

the kind of code you could use is :

from multiprocessing import Process, Queue, cpu_count
import psycopg2
cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table') 
#when loaded into memory data is > 100MB
shared_array = Array('your_data_type', data)

def worker(queue):
    while True:
       args = queue.get()
       if args == 'EOF':
           break
       f(*args)

def f(data, x, start):
    for i in xrange(start,len(data)):
      shared array[!!!!]#do stuff

if __name__ == '__main__':
    other_f_arg = 'some object'

    queue = Queue()
    #spawn 1 child per core:
    workers = [Process(target=worker, args=((queue,)) for cpu in xrange(cpu_count())]
    for w in workers:
        w.start()

    for i in xrange(len(data)-1):
        queue.put((data, other_f_arg, i))

    queue.put('EOF')
    for w in workers:
        w.join()

Upvotes: 1

Related Questions