Reputation: 1283
I'm trying to use map_async
on a class method and I get this error:
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
my code :
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class MyClass(object):
def Submit(self,cmd):
subprocess.call(cmd, shell=True)
def RunTest(self):
cmds = []
for i in range(50):
cmd = CreateCmd(self)
cmds.append(cmd)
self.pool.map_async(self.Submit, cmds)
def Main(self):
self.pool = mp.pool
while True:
RunTest(self)
if __name__ == "__main__":
MyClass()
when Submit
is outside the class it works but like this I get the error.
also, MyClass
has some more methods and attributes that I didn't wrote, one of them is a logger, could this be the problem?
Upvotes: 2
Views: 8589
Reputation: 35247
So I built your code with some alternate imports, notably dill
instead of pickle
. I also used a fork of multiprocessing
called pathos.multiprocessing
that uses dill
. I can pickle your class methods and bound methods. I ignored the whole part with you teaching copy_reg
how to pickle modules, because dill
can do that already.
I had to make some modifications to your code, well, because it didn't work. I also had to make a CreateCmd function, because you didn't give one. Also, this code as-is, will launch multiprocessing jobs… but you never get results because you don't ask for them. What are you really trying to do?
Anyway, here's some code that is like yours, but works. It still doesn't give you any results that are worth anything, except to show that it pickles and the code runs. Please post code that can run, and will throw the error you are reporting.
>>> import dill as pickle
>>> import subprocess
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> def CreateCmd(cmd):
... return 'sleep {0}'.format(cmd)
>>>
>>> class MyClass(object):
... def Submit(self, cmd):
... subprocess.call(cmd, shell=True)
... def RunTest(self):
... cmds = []
... for i in range(50):
... cmd = CreateCmd(i)
... cmds.append(cmd)
... self.pool.amap(self.Submit, cmds) # equivalent to map_async
... def Main(self):
... self.pool = Pool()
... self.RunTest()
...
>>> pickle.loads(pickle.dumps(MyClass))
<class '__main__.MyClass'>
>>> pickle.loads(pickle.dumps(MyClass.RunTest))
<unbound method MyClass.RunTest>
>>> x = MyClass()
>>> pickle.loads(pickle.dumps(x.RunTest))
<bound method MyClass.RunTest of <__main__.MyClass object at 0x10d015b10>>
>>> x.Main()
>>> x.Submit('sleep 1')
>>> # use get to get the result… so 'sleep' is felt by the script
>>> res = x.pool.amap(x.Submit, (CreateCmd(i) for i in range(10)))
>>> res.get()
[None, None, None, None, None, None, None, None, None, None]
Anyway, if you want dill
or pathos
, you can get them at: https://github.com/uqfoundation
By the way, if you wanted to pickle a thread lock, you can do that too.
>>> import dill as pickle
>>> import threading
>>> lock = threading.Lock()
>>>
>>> pickle.loads(pickle.dumps(lock))
<thread.lock object at 0x10c534650>
Upvotes: 4