Reputation: 5751
I've created a program, which can be sum up to something like this:
from itertools import combinations
class Test(object):
def __init__(self, t2):
self.another_class_object = t2
def function_1(self,n):
a = 2
while(a <= n):
all_combs = combinations(range(n),a)
for comb in all_combs:
if(another_class_object.function_2(comb)):
return 1
a += 1
return -1
Function combinations
is imported from itertools
. Function_2
returns True
or False
depending on the input and is a method in another class object, e.g.:
class Test_2(object):
def __init__(self, list):
self.comb_list = list
def function_2(self,c):
return c in self.comb_list
Everything is working just fine. But now I want to change it a little bit and implement multiprocessing. I found this topic that shows an example of how to exit the script when one of the worker process determines no more work needs to be done. So I made following changes:
__init__
method: self.pool = Pool(processes=8)
created a callback function:
all_results = []
def callback_function(self, result):
self.all_results.append(result)
if(result):
self.pool.terminate()
changed function_1
:
def function_1(self,n):
a = 2
while(a <= n):
all_combs = combinations(range(n),a)
for comb in all_combs:
self.pool.apply_async(self.another_class_object.function_2, args=comb, callback=self.callback_function)
#self.pool.close()
#self.pool.join()
if(True in all_results):
return 1
a += 1
return -1
Unfortunately, it does not work as I expected. Why? After debugging it looks like the callback function is never reached. I thought that it would be reached by every worker. Am I wrong? What can be the problem?
Upvotes: 1
Views: 331
Reputation: 15513
Question: ... not work as I expected. ... What can be the problem?
It's always necessary to get()
the Results from pool.apply_async(...
to see the Errors from the Pool Processes.
Change to the following:
pp = []
for comb in all_combs:
pp.append(pool.apply_async(func=self.another_class_object.function_2, args=comb, callback=self.callback_function))
pool.close()
for ar in pp:
print('ar=%s' % ar.get())
And you will see this Error:
TypeError: function_2() takes 2 positional arguments but 3 were given
Fix for this Error, change args=comb
to args=(comb,)
:
pp.append(pool.apply_async(func=self.another_class_object.function_2, args=(comb,), callback=self.callback_function))
Tested with Python: 3.4.2
Upvotes: 1
Reputation: 12205
I did not try your code as such, but I tried your structure. Are you sure the problem is in callback function and not the worker function? I did not manage to get apply_async launch a single instance of the worker function if the function was a class method. It just did not do anything. Apply_async completes without error but it does not implement the worker.
As soon as I moved the worker function (in your case another_class_object.function2) as a standalone global function outside classes, it started working as expected and the callback was triggered normally. The callback function, in contrast, seems to work fine as a class method.
There seems to be discussion about this for example here: Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?
Is this in any way useful?
Hannu
Upvotes: 1