ZF007
ZF007

Reputation: 3731

how to add specific number of additional workers to an exisiting multiprocessing pool?

In below situation I've created a default pool with two workers and perform tasks. During task processing the task_queue is checked regularly so it doesn't exceeds a certain length limit and prevents up/down stream clutter. How to add dynamically more workers to reduce the task queue length?

import multiprocessing as mp

... code snippet...

def main(poolsize, start_process):

    pool = mp.Pool(processes=poolsize, initializer=start_process)
    done = False

    task_queue = []

    while True:

        ... snippet code : do something ...

        if len(task_queue) >= 10:

            ... code to expand pool goes here...

        if done == True:
            break

    .. do final something ...

if __name__ == '__main__':

#    freeze_support()

    poolsize = 2

    main(poolsize)

Upvotes: 2

Views: 293

Answers (2)

milahu
milahu

Reputation: 3559

multiprocessing.Pool with dynamic size

full example based on the answer by ZF007

import multiprocessing, time, random

def worker_function(job_id):
    dt = random.randint(1, 10)
    print(f"job {job_id}: sleeping for {dt} seconds")
    time.sleep(dt)
    return job_id * job_id

def get_job_done(job_id):
    return lambda val: print(f"job {job_id}: job done: val={val}")

def grow_pool(pool, new_size, max_size=None):
    new_size = min(new_size, max_size) if max_size else new_size
    if new_size > pool._processes:
        print(f"growing pool from {pool._processes} to {new_size}")
        pool._processes = new_size
        pool._repopulate_pool()

if __name__ == "__main__":

    # start pool
    start_workers = 1 # start N workers before demand
    max_workers = 4 # run N workers on demand
    pool = multiprocessing.Pool(start_workers)

    # add jobs
    num_jobs = 10
    grow_pool(pool, num_jobs, max_workers)
    for job_id in range(0, num_jobs):
        job_done = get_job_done(job_id)
        print(f"job {job_id}: adding job")
        pool.apply_async(worker_function, args=(job_id,), callback=job_done)

    # wait
    pool.close()
    pool.join()

Upvotes: 0

ZF007
ZF007

Reputation: 3731

To add more workers during a running pool processing job you can add below function within the while-loop:


def repopulate(pool, add_workers):

    current_pool_size = len(pool._pool)         # _.pool gets the current pool size.

    new_pool_size = current_pool_size + add_workers

    pool._processes = new_pool_size

    pool._repopulate_pool()

    return pool

Within the while-loop from main():


if len(task_queue) >= 10:

    new_workers = 2

    repopulate(poolname, new_workers)

Upvotes: 1

Related Questions