Reputation: 3462
I would like to use multiprocessing to use multiple cores to run a procedure that does a pairwise comparison of elements in a large list:
data = [...] #when loaded this is > 100MB
for i in xrange(len(data)-1):
parent = data[i]
for j in xrange(i,len(data)):
child = data[j]
#do something with parent and child
so if I set up a process queue:
def worker(queue):
while True:
args = queue.get()
if args == 'EOF':
break
f(*args)
def f(data, x, start):
for i in xrange(start,len(data)):
#do stuff
if __name__ == '__main__':
from multiprocessing import Process, Queue, cpu_count
import psycopg2
cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table')
#when loaded into memory data is > 100MB
other_f_arg = 'some object'
queue = Queue()
#spawn 1 child per core:
workers = [Process(target=worker, args=((queue,)) for cpu in xrange(cpu_count())]
for w in workers:
w.start()
for i in xrange(len(data)-1):
queue.put((data, other_f_arg, i))
queue.put('EOF')
for w in workers:
w.join()
When this runs, queue.put pushes data
into the queue on every iteration, even though data should only need to be read once and then just re-referenced per process. Thus, all the advantages of multiproc are negated by the repeated data passing. How do I get each process to just pick up a copy of data
and other_f_arg
once and then only just pass the dynamic variable i
as workers are freed?
UPDATE 1:
I decided to use Pool
as per Tim Peters's suggestion below, but instead of using map
, I am using apply_async
with a callback (since I want the parent process to do some postprocessing of the return of f
in a serial fashion instead of waiting for all submissions to finish (since f
will return something large in memory too):
def worker_init(xdata):
global data
data = xdata
def callback(result, x):
#do something with result of f(i), and x
def f(i):
#do something with data[i]
return result
if __name__ == '__main__':
...
data = psycopg2_cursor.fetchall()
NUM_CPU = None
from multiprocessing import Pool
from functools import partial
pool = Pool(processes=NUM_CPU,
initializer=worker_init,
initargs=(data,))
x = 'some extra param I want to pass to callback'
shim_callback = partial(callback, x=x)
for i in xrange(len(data)-1):
pool.apply_async(f,
args=(i,),
callback=shim_callback)
pool.close()
pool.join()
Is there any way to redirect uncaught exceptions in the children to console? (like an exception raised in a single threaded process?) I ask because uncaught exceptions in f
seem to just break the loop that calls apply_async
, and I get no errors out to console or anything.
Upvotes: 1
Views: 1009
Reputation: 70705
Easiest: on a Linux-y system (an OS that supports fork()
), define data
at module level. Then all worker processes will magically see (a copy of) data
, due to magical fork()
semantics.
More portable: use a multiprocessing.Pool()
instead. When you create the Pool
, you can specify an initialization function to run, and arguments to pass to that function. Then you could pass data
just once per process, to some function that, e.g., binds it to a module global name. Other functions can then just refer to that module global. Pool()
also supports several methods for passing out work (and retrieving results) that don't require you to manage queues explicitly. Don't know enough details here to suggest whether that would be better or worse for your specific problem.
Fleshing out the "portable" way
Here's one way to do it:
NUM_CPU = None # defaults to all available CPUs
def worker_init(xdata, xother_f_arg):
global data, other_f_arg
data = xdata
other_f_arg = xother_f_arg
def f(start):
for i in xrange(start, len(data)):
#do stuff
if __name__ == '__main__':
from multiprocessing import Pool
import psycopg2
cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table')
other_f_arg = 'some object'
pool = Pool(processes=NUM_CPU,
initializer=worker_init,
initargs=(data, other_f_arg))
pool.map(f, xrange(len(data) - 1))
pool.close()
pool.join()
Note that it's substantially less code than slinging your own queues too.
While I can't run your code to be sure, I expect you'd be better off not passing the gigantic data
using multiprocessing
machinery, instead having each worker load its own copy from the database. Along the lines of:
def worker_init(xother_f_arg):
import psycopg2
global data, other_f_arg
other_f_arg = xother_f_arg
cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table')
EDIT - dealing with errors
It's hard for parallel gimmicks to raise exceptions in child processes (or threads), because they occur in contexts that - in general - have nothing to do with what the main program happens to be doing at the time. The easiest way to deal with this is to keep references to the AsyncResult
objects you're creating and explicitly .get()
results from them (lose the callback! that's just useless complication here). Replace your:
for i in xrange(len(data)-1):
pool.apply_async(f,
args=(i,),
callback=shim_callback)
with, e.g.,
# queue up all the work
futures = [pool.apply_async(f, args=(i,))
for i in xrange(len(data) - 1)]
# retrieve results
for fut in futures:
try:
result = fut.get()
except NameExceptionsYouWantToCatchHere as e:
# do whatever you want with the exception
else:
# process result
From the docs (current Python 2):
get([timeout])
Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().
In Python 3, there's also a map_async()
method, and an optional error_callback
argument on many of the Pool()
methods.
Note: if len(data)
i
s very large, the multiprocessing
machinery can consume a correspondingly large amount of RAM to queue up all the work items - apply_async()
never blocks, and the loop queues up work items as fast as possible. In that case, another layer of buffering may be needed.
Upvotes: 2
Reputation:
The problem is that passing your 'data' to your worker (= a process) will make the data copied. As it's quite a big dataset you won't (even if you can check to confirm) have any speed improvement.
Depending on the kind of data you have, you should check for multiprocessing array http://docs.python.org/2/library/multiprocessing.html#multiprocessing.Array . It is probably safer than 'global'
the kind of code you could use is :
from multiprocessing import Process, Queue, cpu_count
import psycopg2
cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table')
#when loaded into memory data is > 100MB
shared_array = Array('your_data_type', data)
def worker(queue):
while True:
args = queue.get()
if args == 'EOF':
break
f(*args)
def f(data, x, start):
for i in xrange(start,len(data)):
shared array[!!!!]#do stuff
if __name__ == '__main__':
other_f_arg = 'some object'
queue = Queue()
#spawn 1 child per core:
workers = [Process(target=worker, args=((queue,)) for cpu in xrange(cpu_count())]
for w in workers:
w.start()
for i in xrange(len(data)-1):
queue.put((data, other_f_arg, i))
queue.put('EOF')
for w in workers:
w.join()
Upvotes: 1