Simd
Simd

Reputation: 21343

Running functions in parallel and seeing their progress

I am using joblib to run four processes on four cores in parallel. I would like to see the progress of the four processes separately on different lines. However, what I see is the progress being written on top of each other to the same line until the first process finishes.

from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time

def calc(n_digits):
    # number of iterations
    n = int(n_digits+1/14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits+1

    t    = Decimal(0)
    pi   = Decimal(0)
    deno = Decimal(0)

    for k in trange(n):
        t = ((-1)**k)*(factorial(6*k))*(13591409+545140134*k)
        deno = factorial(3*k)*(factorial(k)**3)*(640320**(3*k))
        pi += Decimal(t)/Decimal(deno)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1/pi
    
    # no need to round
    return pi


def parallel_with_joblib():
    # Define the number of cores to use
    n_cores = 4

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks = [1200, 1700, 900, 1400]


    # Run tasks in parallel
    results = Parallel(n_jobs=n_cores)(delayed(calc)(n) for n in tasks)


if __name__ == "__main__":
    parallel_with_joblib()

I would also like the four lines to be labelled "Job 1 of 4", "Job 2 of 4" etc.


Following the method of @Swifty and changing the number of cores to 3 and the number of tasks to 7 and changing leave=False to leave=True I have this code:

from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time


def calc(n_digits, pos, total):
    # number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in trange(n, position=pos, desc=f"Job {pos + 1} of {total}", leave=True):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    # no need to round
    return pi


def parallel_with_joblib():
    # Define the number of cores to use
    n_cores = 3

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks =  [1200, 1700, 900, 1400, 800, 600, 500]

    # Run tasks in parallel
    results = Parallel(n_jobs=n_cores)(delayed(calc)(n, pos, len(tasks)) for (pos, n) in enumerate(tasks))


if __name__ == "__main__":
    parallel_with_joblib()

I have change it to leave=True as I don't want the blank lines that appear otherwise.

This however gives me:

enter image description here

and then at the end it creates even more mess:

enter image description here

How can this be fixed?

Upvotes: 6

Views: 258

Answers (3)

Booboo
Booboo

Reputation: 44293

My idea was to create all the task bars in the main process and to create a single multiprocessing queue that each pool process would have access to. Then when calc completed an iteration it would place on the queue an integer representing its corresponding task bar. The main process would continue to get these integers from the queue and update the correct task bar. Each calc instance would place a sentinel value on the queue telling the main process that it had no more updates to enqueue.

With a multiprocessing.pool.Pool instance we can use a "pool initializer" function to initialize a global variable queue in each pool process, which will be accessed by calc. Unfortunately, joblib provides no authorized equivalent pool initializer. I tried various workarounds mentioned on the web, but none worked. So if you can live with not using joblib, then try this:

from math import factorial
from decimal import Decimal, getcontext
from multiprocessing import Pool, Queue
from tqdm import tqdm
import time

def init_pool(_queue):
    global queue

    queue = _queue

def calc(n_digits, pos):
    # number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in range(n):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)
        # Tell the main process to update the appropriate bar:
        queue.put(pos)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    # no need to round
    queue.put(None)  # Let updater know we have no more updates
    return pi

def parallel_with_pool():
    # Define the number of cores to use
    n_cores = 4

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks =  [1200, 1700, 900, 1400] # Edit to make code for longer
    n_tasks = len(tasks)

    queue = Queue()

    LEAVE_PROGRESS_BAR = False

    # Create the bars:
    pbars = [
        tqdm(total=tasks[idx],
             position=idx,
             desc=f"Job {idx + 1} of {n_tasks}",
             leave=LEAVE_PROGRESS_BAR
             )
        for idx in range(n_tasks)
    ]

    # Run tasks in parallel
    with Pool(n_cores, initializer=init_pool, initargs=(queue,)) as pool:
        # This doesn't block and allows us to retrieve items from the queue:
        async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))

        n = n_tasks
        while n:
            pos = queue.get()
            # Is this a sentinel value?
            if pos is None:
                n -= 1  # One less task to await
            else:
                pbars[pos].update()

        # We have no more updates to perform, so wait for the results:
        results = async_result.get()

        # Cause the bars to be removed before we display results
        # (See following Notes):
        for pbar in pbars:
            pbar.close()
        # So that the next print call starts at the start of the line
        # (required if leave=False is specified):
        if not LEAVE_PROGRESS_BAR:
            print('\r')

        for result in results:
            print(result)

if __name__ == "__main__":
    parallel_with_pool()

Notes

In the above code the progress bars are instantiated with the argument leave=False signifying that we do not want the bars to remain. Consider the following code:

from tqdm import tqdm
import time

with tqdm(total=10, leave=False) as pbar:
    for _ in range(10):
        pbar.update()
        time.sleep(.5)

print('Done!')

When the with block is terminated, the progress bar will disappear as a result of the implicit call to pbar.__exit__ that occurs. But if we had instead:

pbar = tqdm(total=10, leave=False)
for _ in range(10):
    pbar.update()
    time.sleep(.5)

print('Done')

We would see instead:

C:\Ron\test>test.py
100%|██████████████████████| 10/10 [00:04<00:00,  2.03it/s]Done

Since, in the posted answer we are not using the progress bar as context manager the progress bar are not immediately erased and if we had a print statement to output the actual results of our PI calculations, we would have the problem. The solution is to explicitly call close() on each progress bar:

...
def parallel_with_pool():
        ...

        # We have no more updates to perform, so wait for the results:
        results = async_result.get()

        # Cause the bars to be removed before we display results.
        for pbar in pbars:
            pbar.close()
            # So that the next print call starts at the start of the line
            # (required if leave=False is specified):
            print('\r')

        for result in results:
            print(result)

If you want the progress bars to remain even after they have completed, then specify leave=True as follows:

    pbars = [
        tqdm(total=tasks[idx],
             position=idx,
             desc=f"Job {idx + 1} of {n_tasks}",
             leave=True
             )
        for idx in range(n_tasks)
    ]

It is no longer necessary to call close for each bar, but it does not hurt to do so.

Update

Instead of using a multiprocessing.Queue instance to communicate we can instead create a multiprocessing.Array instance (which uses shared memory) of N counters where N is the number of progress bars whose progress is being tracked. Every iteration of calc will include an increment of the appropriate counter. The main process now has to periodically (say every .1 seconds) check the counters and update the progress bar accordingly:

from math import factorial
from decimal import Decimal, getcontext
from multiprocessing import Pool, Array
from tqdm import tqdm
import time

def init_pool(_progress_cntrs):
    global progress_cntrs

    progress_cntrs = _progress_cntrs

def calc(n_digits, pos):
    # number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in range(n):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)
        progress_cntrs[pos] += 1

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    return pi

def parallel_with_pool():
    # Define the number of cores to use
    n_cores = 4

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks =  [1200, 1700, 900, 1400] # Edit to make code for longer
    n_tasks = len(tasks)
    progress_cntrs = Array('i', [0] * n_tasks, lock=False)

    LEAVE_PROGRESS_BAR = True

    # Create the bars:
    pbars = [
        tqdm(total=tasks[idx],
             position=idx,
             desc=f"Job {idx + 1} of {n_tasks}",
             leave=LEAVE_PROGRESS_BAR
             )
        for idx in range(n_tasks)
    ]

    # Run tasks in parallel
    with Pool(n_cores, initializer=init_pool, initargs=(progress_cntrs,)) as pool:
        # This doesn't block and allows us to retrieve items form the queue:
        async_result = pool.starmap_async(calc, zip(tasks, range(n_tasks)))

        n = n_tasks
        while n:
            time.sleep(.1)

            for idx in range(n_tasks):
                ctr = progress_cntrs[idx]
                if ctr != -1:
                    # This bar isn't complete
                    pbars[idx].n = ctr
                    pbars[idx].refresh()
                    if ctr == tasks[idx]:
                        # This bar is now complete
                        progress_cntrs[idx] = -1 # So we do not process this bar again
                        n -= 1

        # We have no more updates to perform, so wait for the results:
        results = async_result.get()

        # Cause the bars to be removed before we display results
        # (See following Notes):
        for pbar in pbars:
            pbar.close()
        # So that the next print call starts at the start of the line
        # (required if leave=False is specified)
        if not LEAVE_PROGRESS_BAR:
            print('\r')

        for result in results:
            print(result)

if __name__ == '__main__':
    parallel_with_pool()

Upvotes: 5

Simd
Simd

Reputation: 21343

Here is an alternative solution using joblib and Manager from multiprocessing.

from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import tqdm
from multiprocessing import Manager
import time

def calc(n_digits, pos, progress_dict):
    # Number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = max(n, 1)  # Ensure at least one iteration

    # Set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in range(n):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)
        # Update progress in the shared dictionary
        progress_dict[pos] = k + 1

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    # Mark task as complete
    progress_dict[pos] = n_digits
    return pi

def parallel_with_joblib():
    # Define the number of cores to use
    n_cores = 3

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks = [1200, 1700, 900, 1400, 800, 700, 600]  # Edit to make code run longer
    n_tasks = len(tasks)

    # Use a Manager to create a shared dictionary for progress tracking
    manager = Manager()
    progress_dict = manager.dict()

    # Initialize progress dictionary
    for idx in range(n_tasks):
        progress_dict[idx] = 0

    # Create the progress bars:
    pbars = [tqdm(total=tasks[idx], position=idx, desc=f"Job {idx + 1} of {n_tasks}", leave=True) for idx in range(n_tasks)]

    # Run tasks in parallel using joblib
    parallel_results = Parallel(n_jobs=n_cores, return_as="generator")(
        delayed(calc)(tasks[idx], idx, progress_dict) for idx in range(n_tasks)
    )

    # Update progress bars in the main process
    while True:
        all_done = True
        for idx in range(n_tasks):
            current_progress = progress_dict[idx]
            if current_progress < tasks[idx]:
                all_done = False
            pbars[idx].n = current_progress
            pbars[idx].refresh()

        if all_done:
            break

        time.sleep(0.1)  # Small delay to avoid busy-waiting

    # Close all progress bars
    for pbar in pbars:
        pbar.close()

    # Collect results from the generator
    results = list(parallel_results)

    # Print results (optional)
    #for idx, result in enumerate(results):
    #    print(f"Task {idx + 1} result: {result}")

if __name__ == "__main__":
    parallel_with_joblib()
    

Upvotes: 1

Swifty
Swifty

Reputation: 3419

You can display bars in parallel by making use of the position parameter, and label them with the desc parameter. I've added the adequate parameters to your calc function.

from math import factorial
from decimal import Decimal, getcontext
from joblib import Parallel, delayed
from tqdm import trange
import time


def calc(n_digits, pos, total):
    # number of iterations
    n = int(n_digits + 1 / 14.181647462725477)
    n = n if n >= 1 else 1

    # set the number of digits for our numbers
    getcontext().prec = n_digits + 1

    t = Decimal(0)
    pi = Decimal(0)
    deno = Decimal(0)

    for k in trange(n, position=pos, desc=f"Job {pos + 1} of {total}", leave=False):
        t = ((-1) ** k) * (factorial(6 * k)) * (13591409 + 545140134 * k)
        deno = factorial(3 * k) * (factorial(k) ** 3) * (640320 ** (3 * k))
        pi += Decimal(t) / Decimal(deno)

    pi = pi * Decimal(12) / Decimal(640320 ** Decimal(1.5))
    pi = 1 / pi

    # no need to round
    return pi


def parallel_with_joblib():
    # Define the number of cores to use
    n_cores = 4

    # Define the tasks (e.g., compute first 100, 200, 300, 400 digits of pi)
    tasks =  [1200, 1700, 900, 1400] # Edit to make code for longer

    # Run tasks in parallel
    results = Parallel(n_jobs=n_cores)(delayed(calc)(n, pos, len(tasks)) for (pos, n) in enumerate(tasks))


if __name__ == "__main__":
    parallel_with_joblib()

enter image description here

Note: I saw occasional glitches in the terminal (sometimes some of the progress bar got copied further down; so far I've been unable to determine the cause.

Upvotes: 2

Related Questions