Qiu
Qiu

Reputation: 5751

Terminating multiprocess pool when one of the workers found proper solution

I've created a program, which can be sum up to something like this:

from itertools import combinations
class Test(object):
    def __init__(self, t2):
        self.another_class_object = t2

def function_1(self,n):
   a = 2
   while(a <= n):
       all_combs = combinations(range(n),a)
       for comb in all_combs:
           if(another_class_object.function_2(comb)):
              return 1
       a += 1
   return -1

Function combinations is imported from itertools. Function_2 returns True or False depending on the input and is a method in another class object, e.g.:

class Test_2(object):

def __init__(self, list):
    self.comb_list = list

def function_2(self,c):
    return c in self.comb_list

Everything is working just fine. But now I want to change it a little bit and implement multiprocessing. I found this topic that shows an example of how to exit the script when one of the worker process determines no more work needs to be done. So I made following changes:

  1. added a definition of pool into __init__ method: self.pool = Pool(processes=8)
  2. created a callback function:

    all_results = []
    def callback_function(self, result):
        self.all_results.append(result)
        if(result):
            self.pool.terminate()
    
  3. changed function_1:

    def function_1(self,n):
        a = 2
        while(a <= n):
           all_combs = combinations(range(n),a)
           for comb in all_combs:
               self.pool.apply_async(self.another_class_object.function_2, args=comb, callback=self.callback_function)
           #self.pool.close()
           #self.pool.join()
           if(True in all_results):
               return 1
           a += 1
       return -1
    

Unfortunately, it does not work as I expected. Why? After debugging it looks like the callback function is never reached. I thought that it would be reached by every worker. Am I wrong? What can be the problem?

Upvotes: 1

Views: 331

Answers (2)

stovfl
stovfl

Reputation: 15513

Question: ... not work as I expected. ... What can be the problem?

It's always necessary to get() the Results from pool.apply_async(... to see the Errors from the Pool Processes.

Change to the following:

pp = []
for comb in all_combs:
    pp.append(pool.apply_async(func=self.another_class_object.function_2, args=comb, callback=self.callback_function))

pool.close()

for ar in pp:
    print('ar=%s' % ar.get())

And you will see this Error:

TypeError: function_2() takes 2 positional arguments but 3 were given

Fix for this Error, change args=comb to args=(comb,):

pp.append(pool.apply_async(func=self.another_class_object.function_2, args=(comb,), callback=self.callback_function))

Tested with Python: 3.4.2

Upvotes: 1

Hannu
Hannu

Reputation: 12205

I did not try your code as such, but I tried your structure. Are you sure the problem is in callback function and not the worker function? I did not manage to get apply_async launch a single instance of the worker function if the function was a class method. It just did not do anything. Apply_async completes without error but it does not implement the worker.

As soon as I moved the worker function (in your case another_class_object.function2) as a standalone global function outside classes, it started working as expected and the callback was triggered normally. The callback function, in contrast, seems to work fine as a class method.

There seems to be discussion about this for example here: Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

Is this in any way useful?

Hannu

Upvotes: 1

Related Questions