Reputation: 1355
I'm trying to deliver work when a page is appended to the pages list, but my code output returns a NotImplementedError. Here is the code with what I'm trying to do:
Code:
from multiprocessing import Pool, current_process
import time
import random
import copy_reg
import types
import threading
class PageControler(object):
def __init__(self):
self.nProcess = 3
self.pages = [1,2,3,4,5,6,7,8,9,10]
self.manageWork()
def manageWork(self):
self.pool = Pool(processes=self.nProcess)
time.sleep(2)
work_queue = threading.Thread(target=self.modifyQueue)
work_queue.start()
#pool.close()
#pool.join()
def deliverWork(self):
if self.pages != []:
pag = self.pages.pop()
self.pool.apply_async(self.myFun)
def modifyQueue(self):
t = time.time()
while (time.time()-t) < 10:
time.sleep(1)
self.pages.append(99)
print self.pages
self.deliverWork()
def myFun(self):
time.sleep(2)
if __name__ == '__main__':
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
PageControler()
Output:
NotImplementedError: pool objects cannot be passed between processes or pickled
It's any way to pass the pool object between the processes ?
Edit:
I'm using Python 2.6
Upvotes: 25
Views: 17485
Reputation: 329
Dano's answer is a good approach if you must pass the entire object to the process. In your case, the function you are passing to pool has no requirement for a reference to the class instance. So an alternative may be to make the function a static method using the @staticmethod
decorator. If the function does require reference to one or two class member variables these could be passed in as arguments for read only variables, and updated in a callback if a write is required also (of course, you will need to do this if you want to update the local class instance in any case).
For example:
Class A(object):
def __init__(self):
self._pool = multiprocessing.Pool(1)
self.member_variable = 1
@staticmethod
def MyFunc(variable):
variable += 1
return variable
def Callback(self, return_val):
self.member_variable = return_val
def CallFuncAsync(self):
pool.apply_async(self.MyFunc, (self.member_variable,), callback=self.Callback)
Upvotes: 2
Reputation: 94881
In order to pickle the instance method you're trying to pass to the Pool
, Python needs to pickle the entire PageControler
object, including its instance variables. One of those instance variables is the Pool
object itself, and Pool
objects can't be pickled, hence the error. You can work around this by implementing __getstate__
on the object, and using that to remove the pool
object from the instance prior to pickling:
class PageControler(object):
def __init__(self):
self.nProcess = 3
self.pages = [1,2,3,4,5,6,7,8,9,10]
self.manageWork()
def manageWork(self):
self.pool = Pool(processes=self.nProcess)
time.sleep(2)
work_queue = threading.Thread(target=self.modifyQueue)
work_queue.start()
#pool.close()
#pool.join()
def deliverWork(self):
if self.pages != []:
pag = self.pages.pop()
self.pool.apply_async(self.myFun)
def modifyQueue(self):
t = time.time()
while (time.time()-t) < 10:
time.sleep(1)
self.pages.append(99)
print self.pages
self.deliverWork()
def myFun(self):
time.sleep(2)
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
__getstate__
is always called prior to pickling an object, and allow you to specify exactly which pieces of the object's state should actually be pickled. Then upon unpickling, __setstate__(state)
will be called if its implemented (it is in our case), or if it's not, the dict
returned by __getstate__
will be used as the __dict__
for the unpickled instance. In the above example, we're explicitly setting __dict__
to the dict
we returned in __getstate__
, but we could have just not implemented __setstate__
and gotten the same effect.
Upvotes: 43