\n
Following the method of @Swifty and changing the number of cores to 3 and the number of tasks to 7 and changing leave=False to leave=True I have this code:
\nfrom math import factorial\nfrom decimal import Decimal, getcontext\nfrom joblib import Parallel, delayed\nfrom tqdm import trange\nimport time\n\n\ndef calc(n_digits, pos, total):\n # number of iterations\n n = int(n_digits + 1 / 14.181647462725477)\n n = n if n >= 1 else 1\n\n # set the number of digits for our numbers\n getcontext().prec = n_digits + 1\n\n t = Decimal(0)\n pi = Decimal(0)\n deno = Decimal(0)\n\n for k in trange(n, position=pos, desc=f"Job {pos + 1} of {total}", leave=True):\n t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)\n deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))\n pi += Decimal(t) / Decimal(deno)\n\n pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))\n pi = 1 / pi\n\n # no need to round\n return pi\n\n\ndef parallel_with_joblib():\n # Define the number of cores to use\n n_cores = 3\n\n # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)\n tasks = [1200, 1700, 900, 1400, 800, 600, 500]\n\n # Run tasks in parallel\n results = Parallel(n_jobs=n_cores)(delayed(calc)(n, pos, len(tasks)) for (pos, n) in enumerate(tasks))\n\n\nif __name__ == "__main__":\n parallel_with_joblib()\n
\nI have change it to leave=True as I don't want the blank lines that appear otherwise.
\nThis however gives me:
\n\nand then at the end it creates even more mess:
\n\nHow can this be fixed?
\n","author":{"@type":"Person","name":"Simd"},"upvoteCount":6,"answerCount":3,"acceptedAnswer":{"@type":"Answer","text":"My idea was to create all the task bars in the main process and to create a single multiprocessing queue that each pool process would have access to. Then when calc
completed an iteration it would place on the queue an integer representing its corresponding task bar. The main process would continue to get these integers from the queue and update the correct task bar. Each calc
instance would place a sentinel value on the queue telling the main process that it had no more updates to enqueue.
With a multiprocessing.pool.Pool
instance we can use a "pool initializer" function to initialize a global variable queue
in each pool process, which will be accessed by calc
. Unfortunately, joblib
provides no authorized equivalent pool initializer. I tried various workarounds mentioned on the web, but none worked. So if you can live with not using joblib
, then try this:
from math import factorial\nfrom decimal import Decimal, getcontext\nfrom multiprocessing import Pool, Queue\nfrom tqdm import tqdm\nimport time\n\ndef init_pool(_queue):\n global queue\n\n queue = _queue\n\ndef calc(n_digits, pos):\n # number of iterations\n n = int(n_digits + 1 / 14.181647462725477)\n n = n if n >= 1 else 1\n\n # set the number of digits for our numbers\n getcontext().prec = n_digits + 1\n\n t = Decimal(0)\n pi = Decimal(0)\n deno = Decimal(0)\n\n for k in range(n):\n t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)\n deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))\n pi += Decimal(t) / Decimal(deno)\n # Tell the main process to update the appropriate bar:\n queue.put(pos)\n\n pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))\n pi = 1 / pi\n\n # no need to round\n queue.put(None) # Let updater know we have no more updates\n return pi\n\ndef parallel_with_pool():\n # Define the number of cores to use\n n_cores = 4\n\n # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)\n tasks = [1200, 1700, 900, 1400] # Edit to make code for longer\n n_tasks = len(tasks)\n\n queue = Queue()\n\n LEAVE_PROGRESS_BAR = False\n\n # Create the bars:\n pbars = [\n tqdm(total=tasks[idx],\n position=idx,\n desc=f"Job {idx + 1} of {n_tasks}",\n leave=LEAVE_PROGRESS_BAR\n )\n for idx in range(n_tasks)\n ]\n\n # Run tasks in parallel\n with Pool(n_cores, initializer=init_pool, initargs=(queue,)) as pool:\n # This doesn't block and allows us to retrieve items from the queue:\n async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))\n\n n = n_tasks\n while n:\n pos = queue.get()\n # Is this a sentinel value?\n if pos is None:\n n -= 1 # One less task to await\n else:\n pbars[pos].update()\n\n # We have no more updates to perform, so wait for the results:\n results = async_result.get()\n\n # Cause the bars to be removed before we display results\n # (See following Notes):\n for pbar in pbars:\n pbar.close()\n # So that the next print call starts at the start of the line\n # (required if leave=False is specified):\n if not LEAVE_PROGRESS_BAR:\n print('\\r')\n\n for result in results:\n print(result)\n\nif __name__ == "__main__":\n parallel_with_pool()\n
\nNotes
\nIn the above code the progress bars are instantiated with the argument leave=False signifying that we do not want the bars to remain. Consider the following code:
\nfrom tqdm import tqdm\nimport time\n\nwith tqdm(total=10, leave=False) as pbar:\n for _ in range(10):\n pbar.update()\n time.sleep(.5)\n\nprint('Done!')\n
\nWhen the with
block is terminated, the progress bar will disappear as a result of the implicit call to pbar.__exit__
that occurs. But if we had instead:
pbar = tqdm(total=10, leave=False)\nfor _ in range(10):\n pbar.update()\n time.sleep(.5)\n\nprint('Done')\n
\nWe would see instead:
\nC:\\Ron\\test>test.py\n100%|██████████████████████| 10/10 [00:04<00:00, 2.03it/s]Done\n
\nSince, in the posted answer we are not using the progress bar as context manager the progress bar are not immediately erased and if we had a print statement to output the actual results of our PI calculations, we would have the problem. The solution is to explicitly call close()
on each progress bar:
...\ndef parallel_with_pool():\n ...\n\n # We have no more updates to perform, so wait for the results:\n results = async_result.get()\n\n # Cause the bars to be removed before we display results.\n for pbar in pbars:\n pbar.close()\n # So that the next print call starts at the start of the line\n # (required if leave=False is specified):\n print('\\r')\n\n for result in results:\n print(result)\n
\nIf you want the progress bars to remain even after they have completed, then specify leave=True as follows:
\n pbars = [\n tqdm(total=tasks[idx],\n position=idx,\n desc=f"Job {idx + 1} of {n_tasks}",\n leave=True\n )\n for idx in range(n_tasks)\n ]\n
\nIt is no longer necessary to call close
for each bar, but it does not hurt to do so.
Update
\nInstead of using a multiprocessing.Queue
instance to communicate we can instead create a multiprocessing.Array
instance (which uses shared memory) of N counters where N is the number of progress bars whose progress is being tracked. Every iteration of calc
will include an increment of the appropriate counter. The main process now has to periodically (say every .1 seconds) check the counters and update the progress bar accordingly:
from math import factorial\nfrom decimal import Decimal, getcontext\nfrom multiprocessing import Pool, Array\nfrom tqdm import tqdm\nimport time\n\ndef init_pool(_progress_cntrs):\n global progress_cntrs\n\n progress_cntrs = _progress_cntrs\n\ndef calc(n_digits, pos):\n # number of iterations\n n = int(n_digits + 1 / 14.181647462725477)\n n = n if n >= 1 else 1\n\n # set the number of digits for our numbers\n getcontext().prec = n_digits + 1\n\n t = Decimal(0)\n pi = Decimal(0)\n deno = Decimal(0)\n\n for k in range(n):\n t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)\n deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))\n pi += Decimal(t) / Decimal(deno)\n progress_cntrs[pos] += 1\n\n pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))\n pi = 1 / pi\n\n return pi\n\ndef parallel_with_pool():\n # Define the number of cores to use\n n_cores = 4\n\n # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)\n tasks = [1200, 1700, 900, 1400] # Edit to make code for longer\n n_tasks = len(tasks)\n progress_cntrs = Array('i', [0] * n_tasks, lock=False)\n\n LEAVE_PROGRESS_BAR = True\n\n # Create the bars:\n pbars = [\n tqdm(total=tasks[idx],\n position=idx,\n desc=f"Job {idx + 1} of {n_tasks}",\n leave=LEAVE_PROGRESS_BAR\n )\n for idx in range(n_tasks)\n ]\n\n # Run tasks in parallel\n with Pool(n_cores, initializer=init_pool, initargs=(progress_cntrs,)) as pool:\n # This doesn't block and allows us to retrieve items form the queue:\n async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))\n\n n = n_tasks\n while n:\n time.sleep(.1)\n\n for idx in range(n_tasks):\n ctr = progress_cntrs[idx]\n if ctr != -1:\n # This bar isn't complete\n pbars[idx].n = ctr\n pbars[idx].refresh()\n if ctr == tasks[idx]:\n # This bar is now complete\n progress_cntrs[idx] = -1 # So we do not process this bar again\n n -= 1\n\n # We have no more updates to perform, so wait for the results:\n results = async_result.get()\n\n # Cause the bars to be removed before we display results\n # (See following Notes):\n for pbar in pbars:\n pbar.close()\n # So that the next print call starts at the start of the line\n # (required if leave=False is specified)\n if not LEAVE_PROGRESS_BAR:\n print('\\r')\n\n for result in results:\n print(result)\n\nif __name__ == '__main__':\n parallel_with_pool()\n
\n","author":{"@type":"Person","name":"Booboo"},"upvoteCount":5}}}Reputation: 21343
I am using joblib to run four processes on four cores in parallel. I would like to see the progress of the four processes separately on different lines. However, what I see is the progress being written on top of each other to the same line until the first process finishes.
from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time
def calc(n_digits):
# number of iterations
n = int(n_digits+1/14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits+1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in trange(n):
t = ((-1)**k)*(factorial(6*k))*(13591409+545140134*k)
deno = factorial(3*k)*(factorial(k)**3)*(640320**(3*k))
pi += Decimal(t)/Decimal(deno)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1/pi
# no need to round
return pi
def parallel_with_joblib():
# Define the number of cores to use
n_cores = 4
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400]
# Run tasks in parallel
results = Parallel(n_jobs=n_cores)(delayed(calc)(n) for n in tasks)
if __name__ == "__main__":
parallel_with_joblib()
I would also like the four lines to be labelled "Job 1 of 4", "Job 2 of 4" etc.
Following the method of @Swifty and changing the number of cores to 3 and the number of tasks to 7 and changing leave=False to leave=True I have this code:
from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time
def calc(n_digits, pos, total):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in trange(n, position=pos, desc=f"Job {pos + 1} of {total}", leave=True):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# no need to round
return pi
def parallel_with_joblib():
# Define the number of cores to use
n_cores = 3
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400, 800, 600, 500]
# Run tasks in parallel
results = Parallel(n_jobs=n_cores)(delayed(calc)(n, pos, len(tasks)) for (pos, n) in enumerate(tasks))
if __name__ == "__main__":
parallel_with_joblib()
I have change it to leave=True as I don't want the blank lines that appear otherwise.
This however gives me:
and then at the end it creates even more mess:
How can this be fixed?
Upvotes: 6
Views: 258
Reputation: 44293
My idea was to create all the task bars in the main process and to create a single multiprocessing queue that each pool process would have access to. Then when calc
completed an iteration it would place on the queue an integer representing its corresponding task bar. The main process would continue to get these integers from the queue and update the correct task bar. Each calc
instance would place a sentinel value on the queue telling the main process that it had no more updates to enqueue.
With a multiprocessing.pool.Pool
instance we can use a "pool initializer" function to initialize a global variable queue
in each pool process, which will be accessed by calc
. Unfortunately, joblib
provides no authorized equivalent pool initializer. I tried various workarounds mentioned on the web, but none worked. So if you can live with not using joblib
, then try this:
from math import factorial
from decimal import Decimal, getcontext
from multiprocessing import Pool, Queue
from tqdm import tqdm
import time
def init_pool(_queue):
global queue
queue = _queue
def calc(n_digits, pos):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in range(n):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
# Tell the main process to update the appropriate bar:
queue.put(pos)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# no need to round
queue.put(None) # Let updater know we have no more updates
return pi
def parallel_with_pool():
# Define the number of cores to use
n_cores = 4
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400] # Edit to make code for longer
n_tasks = len(tasks)
queue = Queue()
LEAVE_PROGRESS_BAR = False
# Create the bars:
pbars = [
tqdm(total=tasks[idx],
position=idx,
desc=f"Job {idx + 1} of {n_tasks}",
leave=LEAVE_PROGRESS_BAR
)
for idx in range(n_tasks)
]
# Run tasks in parallel
with Pool(n_cores, initializer=init_pool, initargs=(queue,)) as pool:
# This doesn't block and allows us to retrieve items from the queue:
async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))
n = n_tasks
while n:
pos = queue.get()
# Is this a sentinel value?
if pos is None:
n -= 1 # One less task to await
else:
pbars[pos].update()
# We have no more updates to perform, so wait for the results:
results = async_result.get()
# Cause the bars to be removed before we display results
# (See following Notes):
for pbar in pbars:
pbar.close()
# So that the next print call starts at the start of the line
# (required if leave=False is specified):
if not LEAVE_PROGRESS_BAR:
print('\r')
for result in results:
print(result)
if __name__ == "__main__":
parallel_with_pool()
Notes
In the above code the progress bars are instantiated with the argument leave=False signifying that we do not want the bars to remain. Consider the following code:
from tqdm import tqdm
import time
with tqdm(total=10, leave=False) as pbar:
for _ in range(10):
pbar.update()
time.sleep(.5)
print('Done!')
When the with
block is terminated, the progress bar will disappear as a result of the implicit call to pbar.__exit__
that occurs. But if we had instead:
pbar = tqdm(total=10, leave=False)
for _ in range(10):
pbar.update()
time.sleep(.5)
print('Done')
We would see instead:
C:\Ron\test>test.py
100%|██████████████████████| 10/10 [00:04<00:00, 2.03it/s]Done
Since, in the posted answer we are not using the progress bar as context manager the progress bar are not immediately erased and if we had a print statement to output the actual results of our PI calculations, we would have the problem. The solution is to explicitly call close()
on each progress bar:
...
def parallel_with_pool():
...
# We have no more updates to perform, so wait for the results:
results = async_result.get()
# Cause the bars to be removed before we display results.
for pbar in pbars:
pbar.close()
# So that the next print call starts at the start of the line
# (required if leave=False is specified):
print('\r')
for result in results:
print(result)
If you want the progress bars to remain even after they have completed, then specify leave=True as follows:
pbars = [
tqdm(total=tasks[idx],
position=idx,
desc=f"Job {idx + 1} of {n_tasks}",
leave=True
)
for idx in range(n_tasks)
]
It is no longer necessary to call close
for each bar, but it does not hurt to do so.
Update
Instead of using a multiprocessing.Queue
instance to communicate we can instead create a multiprocessing.Array
instance (which uses shared memory) of N counters where N is the number of progress bars whose progress is being tracked. Every iteration of calc
will include an increment of the appropriate counter. The main process now has to periodically (say every .1 seconds) check the counters and update the progress bar accordingly:
from math import factorial
from decimal import Decimal, getcontext
from multiprocessing import Pool, Array
from tqdm import tqdm
import time
def init_pool(_progress_cntrs):
global progress_cntrs
progress_cntrs = _progress_cntrs
def calc(n_digits, pos):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in range(n):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
progress_cntrs[pos] += 1
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
return pi
def parallel_with_pool():
# Define the number of cores to use
n_cores = 4
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400] # Edit to make code for longer
n_tasks = len(tasks)
progress_cntrs = Array('i', [0] * n_tasks, lock=False)
LEAVE_PROGRESS_BAR = True
# Create the bars:
pbars = [
tqdm(total=tasks[idx],
position=idx,
desc=f"Job {idx + 1} of {n_tasks}",
leave=LEAVE_PROGRESS_BAR
)
for idx in range(n_tasks)
]
# Run tasks in parallel
with Pool(n_cores, initializer=init_pool, initargs=(progress_cntrs,)) as pool:
# This doesn't block and allows us to retrieve items form the queue:
async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))
n = n_tasks
while n:
time.sleep(.1)
for idx in range(n_tasks):
ctr = progress_cntrs[idx]
if ctr != -1:
# This bar isn't complete
pbars[idx].n = ctr
pbars[idx].refresh()
if ctr == tasks[idx]:
# This bar is now complete
progress_cntrs[idx] = -1 # So we do not process this bar again
n -= 1
# We have no more updates to perform, so wait for the results:
results = async_result.get()
# Cause the bars to be removed before we display results
# (See following Notes):
for pbar in pbars:
pbar.close()
# So that the next print call starts at the start of the line
# (required if leave=False is specified)
if not LEAVE_PROGRESS_BAR:
print('\r')
for result in results:
print(result)
if __name__ == '__main__':
parallel_with_pool()
Upvotes: 5
Reputation: 21343
Here is an alternative solution using joblib and Manager from multiprocessing.
from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import tqdm
from multiprocessing import Manager
import time
def calc(n_digits, pos, progress_dict):
# Number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = max(n, 1) # Ensure at least one iteration
# Set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in range(n):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
# Update progress in the shared dictionary
progress_dict[pos] = k + 1
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# Mark task as complete
progress_dict[pos] = n_digits
return pi
def parallel_with_joblib():
# Define the number of cores to use
n_cores = 3
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400, 800, 700, 600] # Edit to make code run longer
n_tasks = len(tasks)
# Use a Manager to create a shared dictionary for progress tracking
manager = Manager()
progress_dict = manager.dict()
# Initialize progress dictionary
for idx in range(n_tasks):
progress_dict[idx] = 0
# Create the progress bars:
pbars = [tqdm(total=tasks[idx], position=idx, desc=f"Job {idx + 1} of {n_tasks}", leave=True) for idx in range(n_tasks)]
# Run tasks in parallel using joblib
parallel_results = Parallel(n_jobs=n_cores, return_as="generator")(
delayed(calc)(tasks[idx], idx, progress_dict) for idx in range(n_tasks)
)
# Update progress bars in the main process
while True:
all_done = True
for idx in range(n_tasks):
current_progress = progress_dict[idx]
if current_progress < tasks[idx]:
all_done = False
pbars[idx].n = current_progress
pbars[idx].refresh()
if all_done:
break
time.sleep(0.1) # Small delay to avoid busy-waiting
# Close all progress bars
for pbar in pbars:
pbar.close()
# Collect results from the generator
results = list(parallel_results)
# Print results (optional)
#for idx, result in enumerate(results):
# print(f"Task {idx + 1} result: {result}")
if __name__ == "__main__":
parallel_with_joblib()
Upvotes: 1
Reputation: 3419
You can display bars in parallel by making use of the position
parameter, and label them with the desc
parameter.
I've added the adequate parameters to your calc
function.
from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time
def calc(n_digits, pos, total):
# number of iterations
n = int(n_digits + 1 / 14.181647462725477)
n = n if n >= 1 else 1
# set the number of digits for our numbers
getcontext().prec = n_digits + 1
t = Decimal(0)
pi = Decimal(0)
deno = Decimal(0)
for k in trange(n, position=pos, desc=f"Job {pos + 1} of {total}", leave=False):
t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
pi += Decimal(t) / Decimal(deno)
pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
pi = 1 / pi
# no need to round
return pi
def parallel_with_joblib():
# Define the number of cores to use
n_cores = 4
# Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
tasks = [1200, 1700, 900, 1400] # Edit to make code for longer
# Run tasks in parallel
results = Parallel(n_jobs=n_cores)(delayed(calc)(n, pos, len(tasks)) for (pos, n) in enumerate(tasks))
if __name__ == "__main__":
parallel_with_joblib()
Note: I saw occasional glitches in the terminal (sometimes some of the progress bar got copied further down; so far I've been unable to determine the cause.
Upvotes: 2