Michael
Michael

Reputation: 371

Why can't I use join() before closing pool in python multiprocessing

I have a class that has a method that does some parallel calculations and is called pretty often. As such I want my pool to be initialized once, at the class's constructor and not create a new pool every time this method is called. In this method, I want to use apply_async() to start a task for all worker processes and then wait (block) and aggregate the result of each task. My code looks like this:

class Foo:
     def __init__(self, ...):
         # ...
         self.pool = mp.Pool(mp.cpu_count())

     def do_parallel_calculations(self, ...):
         for _ in range(mp.cpu_count()):
              self.pool.apply_async(calc_func, args=(...), callback=aggregate_result)
         
         # wait for results to be aggregated to a global var by the callback
         self.pool.join()  # <-- ValueError: Pool is still running
         
         # do something with the aggregated result of all worker processes

However, when I run this I get an error in self.pool.join() that says: "ValueError: Pool is still running". Now, in all examples I have seen self.pool.close() is called before self.pool.join() and I assume that is why I get this error but I don't want to close my pool as I want it there for the next time this method is called! I can't not use self.pool.join() as I need a way to wait for all processes to finish and I don't want to wastefully spin manually for example by using a "while not global_flag: pass".

What can I do to achieve what I am trying to do? And why won't multiprocessing let me join with a still open pool? It seems like a perfectly reasonable thing to want to do.

Upvotes: 1

Views: 3183

Answers (2)

Booboo
Booboo

Reputation: 44108

Let's make this concrete with a real example:

import multiprocessing as mp


def calc_func(x):
    return x * x


class Foo:
    def __init__(self):
        self.pool = mp.Pool(mp.cpu_count())

    def do_parallel_calculations(self, values):
        results = []
        for value in values:
            results.append(self.pool.apply_async(calc_func, args=(value,)))
        for result in results:
            print(result.get())

if __name__ == '__main__':
    foo = Foo()
    foo.do_parallel_calculations([1,2,3])

Upvotes: 2

Michael
Michael

Reputation: 371

I think I managed to do it by calling get() on the AsyncResult object that apply_async() returns. So the code becomes:

def do_parallel_calculations(self, ...):
     results = []
     for _ in range(mp.cpu_count()):
          results.append(self.pool.apply_async(calc_func, args=(...)))
     aggregated_result = 0
     for result in results:
          aggregated_result += result.get()

where calc_func() returns the individual task result and no callback and global variables are needed.

It's not ideal because I wait on them in an arbitrary order and not in the order that they actually finished (the most efficient way to do this would be to reduce the result) but since I only have 4 cores it should hardly be noticeable.

Upvotes: 0

Related Questions