sathish j
sathish j

Reputation: 79

Python 3.6.8 - multiprocessing.Pool.apply_async() not working

Its seems like apply_async is not working and nothing happens. Not sure whats wrong here. I am using MacOS catalina

import time
from multiprocessing import Pool

def worker(sl):
    print(sl)
    time.sleep(sl)
    return sl

if __name__ == '__main__':
    with Pool(processes=3) as pool:
        for i in range(5,30,5):
            result = pool.apply_async(func=worker,args=(i,))

Upvotes: 2

Views: 4334

Answers (1)

Booboo
Booboo

Reputation: 44323

When you call pool.apply_async you are scheduling a task to be run. The return value from that call is a multiprocessing.pool.AsyncResult instance that you call method get on that will block until the task is completed and will return the return value from the worker function specified in the apply_async method call. But you have not called get (or wait) on any of these AsyncResult instances. Instead, you have let yourself immediately fall through to the end of the with Pool(processes=3) as pool: block and therein lies your problem.

The documentation is not very explicit. There is, however, this warning:

Warning: multiprocessing.pool objects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling close() and terminate() manually. Failure to do this can lead to the process hanging on finalization.

You are using pool as a context manager as a result of the with statement and what actually happens is that at the completion of the with block there is a call to pool.terminate(). Thus all the processes in the pool are immediately terminated before they have a chance to run any of your submitted tasks.

Since you are not interested in the actual return values from worker, an alternative to saving the AsyncResult objects and calling get on them is to call pool.close() followed by pool.join() before exiting the with block, which will wait for all submitted tasks to complete:

close() Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

join() Wait for the worker processes to exit. One must call close() or terminate() before using join().

import time
from multiprocessing import Pool

def worker(sl):
    print(sl)
    time.sleep(sl)
    return sl

if __name__ == '__main__':
    with Pool(processes=3) as pool:
        for i in range(5,30,5):
            result = pool.apply_async(func=worker,args=(i,))
        pool.close()
        pool.join()

Prints:

5
10
15
20
25

Upvotes: 7

Related Questions