slackline
slackline

Reputation: 2417

join() output from multiprocessing when using tqdm for progress bar

I'm using a construct similar to this example to run my processing in parallel with a progress bar courtesy of tqdm...

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(_foo, range(0, max_)):
                pbar.update()
    results = p.join()  ## My attempt to combine results

results is always NoneType though, and I can not work out how to get my results combined. I understand that with ...: will close what it is working with on completion automatically.

I've tried doing away with the outer with:

if __name__ == '__main__':
    max_ = 10
    p = Pool(processes=8)
    with tqdm(total=max_) as pbar:
        for _ in p.imap_unordered(_foo, range(0, max_)):
            pbar.update()
    p.close()
    results = p.join()
    print(f"Results : {results}")

Stumped as to how to join() my results?

Upvotes: 1

Views: 390

Answers (1)

Booboo
Booboo

Reputation: 44013

Your call to p.join() just waits for all the pool processes to end and returns None. This call is actually unnecessary since you are using the pool as a context manager, that is you have specified with Pool(processes=2) as p:). When that block terminates an implicit call is made to p.terminate(), which immediately terminates the pool processes and any tasks that may be running or queued up to run (there are none in your case).

It is, in fact, iterating the iterator returned by the call to p.imap_unordered that returns each return value from your worker function, _foo. But since you are using method imap_unordered, the results returned may not be in submission order. In other words, you cannot assume that the return values will be in succession 0, 1, , 4, 9, etc. There are many ways to handle this, such as having your worker function return the original argument along with the squared value:

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return my_number, square # return the argunent along with the result

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        results = [None] * 30; # preallocate the resulys array
        with tqdm(total=max_) as pbar:
            for x, result in p.imap_unordered(_foo, range(0, max_)):
                results[x] = result
                pbar.update()
        print(results)

The second way is to not use imap_unordered, but rather apply_async with a callback function. The disadvantage of this is that for large iterables you do not have the option of specifying a chunksize argument as you do with imap_unordered:

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return square

if __name__ == '__main__':
    def my_callback(_): # ignore result
        pbar.update() # update progress bar when a result is produced

    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            async_results = [p.apply_async(_foo, (x,), callback=my_callback) for x in range(0, max_)]
            # wait for all tasks to complete:
            p.close()
            p.join()
            results = [async_result.get() for async_result in async_results]
        print(results)

Upvotes: 2

Related Questions