Faheem Mitha
Faheem Mitha

Reputation: 6326

Sharing state across multiple processes using the Python multiprocessing module

I have a function that does a calculation and saves the state of the calculation in the result dictionary (default default argument). I first run it, then run several processes using the multiprocessing module. I need to run the function again in each of those parallel processes, but after this function has run once, I need the cached state to be returned, the value must not be recalculated. This requirement doesn't make sense in my example, but I can't think of a simple realistic argument that would require this restriction. Using a dict as mutable default argument works, but this doesn't work with the multiprocessing module. What approach can I use to get the same effect?

Note that the state value is something (a dictionary containing class values) that cannot be passed to the multiple processes as an argument afaik.

The SO question Python multiprocessing: How do I share a dict among multiple processes? seems to cover similar ground. Perhaps I can use a Manager to do what I need, but it is not obvious how. Alternatively, one could perhaps save the value to a global object, per https://stackoverflow.com/a/4534956/350713, but that doesn't seem very elegant.

def foo(result={}):
    if result:
        print "returning cached result"
        return result
    result[1] = 2
    return result

def parafn():
    from multiprocessing import Pool
    pool = Pool(processes=2)
    arglist = []
    foo()
    for i in range(4):
        arglist.append({})
    results = []
    r = pool.map_async(foo, arglist, callback=results.append)
    r.get()
    r.wait()
    pool.close()
    pool.join()
    return results

print parafn()

UPDATE: Thanks for the comments. I've got a working example now, posted below.

Upvotes: 1

Views: 4029

Answers (2)

Faheem Mitha
Faheem Mitha

Reputation: 6326

This code would not win any beauty prizes, but works for me. This example is similar to the example in the question, but with some minor changes. The add_to_d construct is a bit awkward, but I don't see a better way to do this.

Brief summary: I copy the state of foo's d, (which is a mutable default argument) back to foo, but the foo in the new process spaces created by the pool. Once this is done, then foo in the new process spaces will not recalculate the cached values. It seems this is what the pool initializer does, though the documentation is not very explicit.

class bar(object):
    def __init__(self, x):
        self.x = x
    def __repr__(self):
        return "<bar "+ str(self.x) +">"

def foo(x=None, add_to_d=None, d = {}):
    if add_to_d:
        d.update(add_to_d)
    if x is None:
        return
    if x in d:
        print "returning cached result, d is %s, x is %s"%(d, x)
        return d[x]
    d[x] = bar(x)
    return d[x]

def finit(cacheval):
    foo(x=None, add_to_d=cacheval)

def parafn():
    from multiprocessing import Pool
    arglist = []
    foo(1)
    pool = Pool(processes=2, initializer=finit, initargs=[foo.func_defaults[2]])
    arglist = range(4)
    results = []
    r = pool.map_async(foo, iterable=arglist, callback=results.append)
    r.get()
    r.wait()
    pool.close()
    pool.join()
    return results

print parafn()

Upvotes: 1

Netwave
Netwave

Reputation: 42678

I think the safest way of exchange data between procesess is with a Queue, the multiprocessing module brings you 2 types of them Queue and JoinableQueue, see documentation:

http://docs.python.org/library/multiprocessing.html#exchanging-objects-between-processes

Upvotes: 1

Related Questions