Reputation: 21552
the following does not work using python 2.7.9, but also does not throw any error or exception. is there a bug, or can multiprocessing not be used in a class?
from multiprocessing import Pool
def testNonClass(arg):
print "running %s" % arg
return arg
def nonClassCallback(result):
print "Got result %s" % result
class Foo:
def __init__(self):
po = Pool()
for i in xrange(1, 3):
po.apply_async(self.det, (i,), callback=self.cb)
po.close()
po.join()
print "done with class"
po = Pool()
for i in xrange(1, 3):
po.apply_async(testNonClass, (i,), callback=nonClassCallback)
po.close()
po.join()
def cb(self, r):
print "callback with %s" % r
def det(self, M):
print "method"
return M+2
if __name__ == "__main__":
Foo()
running prints this:
done with class
running 1
running 2
Got result 1
Got result 2
EDIT: THis seems related, but it uses .map
, while I specifically am needing to use apply_async
which seems to matter in terms of how multiprocessing works with class instances (e.g. I dont have a picklnig error, like many other questions related to this) - Python how to do multiprocessing inside of a class?
Upvotes: 1
Views: 1575
Reputation: 25813
Processes don't share state or memory by default, each process is an independent program. You need to either 1) use threading 2) use specific types capable of sharing state or 3) design your program to avoid shared state and rely on return values instead.
Update
You have two issues in your code, and one is masking the other.
1) You don't do anything with the result of the apply_async
, I see that you're using callbacks, but you still need to catch the results and handle them. Because you're not doing this, you're not seeing the error caused by the second problem.
2) Methods of an object cannot be passed to other processes... I was really annoyed when I first discovered this, but there is an easy workaround. Try this:
from multiprocessing import Pool
def _remote_det(foo, m):
return foo.det(m)
class Foo:
def __init__(self):
pass
po = Pool()
results = []
for i in xrange(1, 3):
r = po.apply_async(_remote_det, (self, i,), callback=self.cb)
results.append(r)
po.close()
for r in results:
r.wait()
if not r.successful():
# Raises an error when not successful
r.get()
po.join()
print "done with class"
def cb(self, r):
print "callback with %s" % r
def det(self, M):
print "method"
return M+2
if __name__ == "__main__":
Foo()
Upvotes: 1
Reputation: 608
I'm pretty sure it can be used in a class, but you need to protect the call to Foo
inside of a clause like:
if name == "__main__":
so that it only gets called in the main thread. You may also have to alter the __init__
function of the class so that it accepts a pool as an argument instead of creating a pool.
I just tried this
from multiprocessing import Pool
#global counter
#counter = 0
class Foo:
def __init__(self, po):
for i in xrange(1, 300):
po.apply_async(self.det, (i,), callback=self.cb)
po.close()
po.join()
print( "foo" )
#print counter
def cb(self, r):
#global counter
#print counter, r
counter += 1
def det(self, M):
return M+2
if __name__ == "__main__":
po = Pool()
Foo(po)
and I think I know what the problem is now. Python isn't multi-threaded; global interpreter lock prevents that. Python is using multiple processes, instead, so the sub-processes in the Pool don't have access to the standard output of the main process.
The subprocesses also are unable to modify the variable counter
because it exists in a different process (I tried running with the counter
lines commented out and uncommented). Now, I do recall seeing cases where global state variables get altered by processes in the pool, so I don't know all of the minutiae. I do know that it is, in general, a bad idea to have global state variables like that, if for no other reason than they can lead to race conditions and/or wasted time with locks and waiting for access to the global variable.
Upvotes: 0