Reputation: 1303
I have the following code with create_data()
referring to the function I already defined before.
%%time
from tqdm import tqdm
from multiprocessing import Pool
import pandas as pd
import os
with Pool(processes=os.cpu_count()) as pool:
results = pool.map(create_data, date)
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)
data.to_csv("%s"%str(date), index=False)
I basically want to call create_data()
while also passing the date argument. Then all the results obtained will be gathered into the results
variable. I will then combine them all into a list and convert it to a data frame. The function create_data
is quite heavy in computation and thus takes a long time to compute. That's why I need the progress bar to see the processes.
I have tried changing the line into the following.
results = list(tqdm(pool.map(create_od, date), total = os.cpu_count()))
But it doesn't seem to be working. I have waited for quite some time and there is no progress bar showing up. How am I supposed to do here?
Upvotes: 7
Views: 21979
Reputation: 4368
It blocks until the result is ready
and tqdm.tqdm :
Decorate an iterable object, returning an iterator which acts exactly like the original iterable, but prints a dynamically updating progressbar every time a value is requested.
So the map
ping is completely finished before tqdm
ever gets called.
I reproduced with this code :
from time import sleep
from tqdm import tqdm
from multiprocessing import Pool
def crunch(numbers):
print(numbers)
sleep(2)
if __name__ == "__main__":
with Pool(processes=4) as pool:
print("mapping ...")
results = tqdm(pool.map(crunch, range(40)), total=40)
print("done")
which prints :
mapping ...
0
3
6
[...]
37
38
0%| | 0/40 [00:00<?, ?it/s]done
Instead you should use the lazy version multiprocessing.Pool.imap
: it will return immediately a generator that you have to iterate over to get the actual results, which can be wrapped in tqdm
.
from time import sleep
from multiprocessing import Pool
from tqdm import tqdm
def crunch(numbers):
# print(numbers) # commented out to not mess the tqdm output
sleep(2)
if __name__ == "__main__":
with Pool(processes=4) as pool:
print("mapping ...")
results = tqdm(pool.imap(crunch, range(40)), total=40)
print("running ...")
tuple(results) # fetch the lazy results
print("done")
which prints :
mapping ...
running ...
0%| | 0/40 [00:00<?, ?it/s]
2%|▎ | 1/40 [00:02<01:35, 2.45s/it]
12%|█▎ | 5/40 [00:04<00:27, 1.26it/s]
22%|██▎ | 9/40 [00:06<00:19, 1.58it/s]
32%|███▎ | 13/40 [00:08<00:15, 1.74it/s]
42%|████▎ | 17/40 [00:10<00:12, 1.83it/s]
52%|█████▎ | 21/40 [00:12<00:10, 1.89it/s]
62%|██████▎ | 25/40 [00:14<00:07, 1.92it/s]
72%|███████▎ | 29/40 [00:16<00:05, 1.95it/s]
82%|████████▎ | 33/40 [00:18<00:03, 1.96it/s]
100%|██████████| 40/40 [00:20<00:00, 1.95it/s]
done
(the progress bar is on multiple lines because my PyCharm on Windows terminal does not support \r
, but it should work fine on yours)
Upvotes: 14
Reputation: 44013
Update
See the answer of @Lenormnu, who is on the right track.
However the problem with the imap
method is that it is guaranteed to return its results in the order of the arguments. So if the processing of the very first element of the date
list takes extremely long (it is the last task to finish), the progress bar will not advance until that task completes. But by then all of the other submitted tasks have completed and the progress bar will immediately jump to 100%. Admittedly, this is not likely to happen. But it would be better if you could process results in completion order. imap_unordered
could be used but to recover the results in the order of task submission would required you to first modify your create_data
function.
If you use class ProcessPoolExecutor
from the concurrent.futures
module for multiprocessing, you actually do not have to modify your create_data
function at all since method submit
creates Future
instances that you can store in a dictionary as keys whose values are indexes and then apply the as_completed
method to the dictionary to return the completed tasks in their order of completion and recover the index. Here I am being a bit more clever so no sorting is required:
import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
import pandas as pd
import os
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# total argument for tqdm is just the number of submitted tasks:
with tqdm.tqdm(total=len(date)) as progress_bar:
futures = {}
for idx, dt in enumerate(date):
future = executor.submit(create_data, dt)
futures[future] = idx
results = [None] * len(date) # pre_allocate slots
for future in as_completed(futures):
idx = futures[future] # order of submission
results[idx] = future.result()
progress_bar.update(1) # advance by 1
data = [ent for sublist in results for ent in sublist]
data = pd.DataFrame(data, columns = cols)
Upvotes: 4