Reputation: 33
For performance reasons, I divide a big Dataframe into several small Dataframes, iterate through each of them and do some calculations. Now I'm trying to create a global progress bar that shows me all the iterations i've made so far. By using multiprocessing, of course, a progress bar is now created for each individual process. Is there a way to update the "overall" progress? Unfortunately, I have not been able to find answers in other forum posts because i do not want to see the progress of each individual sub-processes or how many processes are finished, but the status of all iterations performed in the "do_calculations" function. My code is:
import multiprocessing as mp
from tqdm import tqdm
import pandas as pd
# load the initial dataframe
initial_df = pd.read_csv(r"...") # let's assume len(initial_df)=60
# create a "global" progress bar
pbar = tqdm(total=len(initial_df))
def do_calculations(sub_df):
"""Function that calculates some things for each row of a sub_dataframe."""
# iterate through the sub_dataframe
for index, row in sub_df.iterrows():
# do some calculations
# here i want to update the "global" progress bar for all parallel
# progresses
global pbar
pbar.update(1)
return sub_df
def execute():
"""Function that executes the 'do_calculations' function using multiprocessing."""
num_processes = mp.cpu_count() - 2 # let's assume num_processes=6
pool = mp.Pool(processes=num_processes)
# split the initial dataframe
divided_df = np.array_split(initial_df, num_processes)
# execute the 'do_calculations' function using multiprocessing and re-joining the
#dataframe
new_df = pd.concat(pool.map(do_calculations, divided_df))
pool.close()
pool.join()
return new_df
if __name__ == "__main__":
new_data = execute()
My result so far is:
17%|█▋ | 10/60 [00:05<00:25, 1.99it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.97it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.98it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.96it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.98it/s]
17%|█▋ | 10/60 [00:05<00:25, 1.98it/s]
0%| | 0/60 [00:08<?, ?it/s]
My desired result is the number of iterations done in the do_calculations function:
100%|██████████| 60/60 [00:13<00:00, 4.42it/s]
I am not tying the show at which step the 'map' function is:
100%|██████████| 6/6 [00:04<00:00, 1.28it/s]
I am grateful for any help!! Thanks in advance.
Upvotes: 1
Views: 1752
Reputation: 540
Here is an example of using tqdm
with multiprocessing.pool.imap
(source):
import multiprocessing as mp
import numpy as np
import pandas as pd
from tqdm import tqdm
def do_calculations(sub_df):
"""Function that calculates some things for each row of a sub_dataframe."""
# iterate through the sub_dataframe
for index, row in sub_df.iterrows():
# do some calculations
pass
return sub_df
def execute():
"""Function that executes the 'do_calculations' function using multiprocessing."""
num_processes = mp.cpu_count() - 2
# Split the initial dataframe
# Create 4 times more divided dataframes than processes being used to show progress.
divided_df = np.array_split(initial_df, num_processes * 4)
with mp.Pool(processes=num_processes) as pool:
# Inspiration: https://stackoverflow.com/a/45276885/4856719
results = list(tqdm(pool.imap(do_calculations, divided_df), total=len(divided_df)))
new_df = pd.concat(results, axis=0, ignore_index=True)
return new_df
if __name__ == "__main__":
# load the initial dataframe (replaced)
initial_df = pd.DataFrame(np.random.randint(0, 100, size=(10_000_000, 4)), columns=list('ABCD'))
new_data = execute()
Output (using 18 out of 20 threads):
100%|██████████| 72/72 [00:35<00:00, 2.05it/s]
This solution is only useful, if more dataframe splits are being created than processes are used. Otherwise (when all processes take the same amount of time), the progress bar only "moves" once (at the end).
Upvotes: 1