catris25
catris25

Reputation: 1303

How to show progress bar (tqdm) while using multiprocessing in Python?

I have the following code with create_data() referring to the function I already defined before.

%%time
from tqdm import tqdm
from multiprocessing import Pool
import pandas as pd
import os

with Pool(processes=os.cpu_count()) as pool:
    results = pool.map(create_data, date)
    data = [ent for sublist in results for ent in sublist]
    data = pd.DataFrame(data, columns = cols)
    data.to_csv("%s"%str(date), index=False)

I basically want to call create_data() while also passing the date argument. Then all the results obtained will be gathered into the results variable. I will then combine them all into a list and convert it to a data frame. The function create_data is quite heavy in computation and thus takes a long time to compute. That's why I need the progress bar to see the processes.

I have tried changing the line into the following.

results = list(tqdm(pool.map(create_od, date), total = os.cpu_count()))

But it doesn't seem to be working. I have waited for quite some time and there is no progress bar showing up. How am I supposed to do here?

Upvotes: 7

Views: 21979

Answers (2)

Lenormju
Lenormju

Reputation: 4368

cf multiprocessing.Pool.map :

It blocks until the result is ready

and tqdm.tqdm :

Decorate an iterable object, returning an iterator which acts exactly like the original iterable, but prints a dynamically updating progressbar every time a value is requested.

So the mapping is completely finished before tqdm ever gets called.

I reproduced with this code :

from time import sleep
from tqdm import tqdm
from multiprocessing import Pool


def crunch(numbers):
    print(numbers)
    sleep(2)


if __name__ == "__main__":
    with Pool(processes=4) as pool:
        print("mapping ...")
        results = tqdm(pool.map(crunch, range(40)), total=40)
        print("done")

which prints :

mapping ...
0
3
6
[...]
37
38
  0%|          | 0/40 [00:00<?, ?it/s]done

Instead you should use the lazy version multiprocessing.Pool.imap : it will return immediately a generator that you have to iterate over to get the actual results, which can be wrapped in tqdm.

from time import sleep
from multiprocessing import Pool

from tqdm import tqdm


def crunch(numbers):
    # print(numbers)  # commented out to not mess the tqdm output
    sleep(2)


if __name__ == "__main__":
    with Pool(processes=4) as pool:
        print("mapping ...")
        results = tqdm(pool.imap(crunch, range(40)), total=40)
        print("running ...")
        tuple(results)  # fetch the lazy results
        print("done")

which prints :

mapping ...
running ...
  0%|          | 0/40 [00:00<?, ?it/s]
  2%|▎         | 1/40 [00:02<01:35,  2.45s/it]
 12%|█▎        | 5/40 [00:04<00:27,  1.26it/s]
 22%|██▎       | 9/40 [00:06<00:19,  1.58it/s]
 32%|███▎      | 13/40 [00:08<00:15,  1.74it/s]
 42%|████▎     | 17/40 [00:10<00:12,  1.83it/s]
 52%|█████▎    | 21/40 [00:12<00:10,  1.89it/s]
 62%|██████▎   | 25/40 [00:14<00:07,  1.92it/s]
 72%|███████▎  | 29/40 [00:16<00:05,  1.95it/s]
 82%|████████▎ | 33/40 [00:18<00:03,  1.96it/s]
100%|██████████| 40/40 [00:20<00:00,  1.95it/s]
done

(the progress bar is on multiple lines because my PyCharm on Windows terminal does not support \r, but it should work fine on yours)

Upvotes: 14

Booboo
Booboo

Reputation: 44013

Update

See the answer of @Lenormnu, who is on the right track.

However the problem with the imap method is that it is guaranteed to return its results in the order of the arguments. So if the processing of the very first element of the date list takes extremely long (it is the last task to finish), the progress bar will not advance until that task completes. But by then all of the other submitted tasks have completed and the progress bar will immediately jump to 100%. Admittedly, this is not likely to happen. But it would be better if you could process results in completion order. imap_unordered could be used but to recover the results in the order of task submission would required you to first modify your create_data function.

If you use class ProcessPoolExecutor from the concurrent.futures module for multiprocessing, you actually do not have to modify your create_data function at all since method submit creates Future instances that you can store in a dictionary as keys whose values are indexes and then apply the as_completed method to the dictionary to return the completed tasks in their order of completion and recover the index. Here I am being a bit more clever so no sorting is required:

import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
import os

with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
    # total argument for tqdm is just the number of submitted tasks:
    with tqdm.tqdm(total=len(date)) as progress_bar:
        futures = {}
        for idx, dt in enumerate(date):
            future = executor.submit(create_data, dt)
            futures[future] = idx
        results = [None] * len(date) # pre_allocate slots
        for future in as_completed(futures):
            idx = futures[future] # order of submission
            results[idx] = future.result()
            progress_bar.update(1) # advance by 1
    data = [ent for sublist in results for ent in sublist]
    data = pd.DataFrame(data, columns = cols)

Upvotes: 4

Related Questions