Reputation: 3731
In below situation I've created a default pool with two workers and perform tasks. During task processing the task_queue is checked regularly so it doesn't exceeds a certain length limit and prevents up/down stream clutter. How to add dynamically more workers to reduce the task queue length?
import multiprocessing as mp
... code snippet...
def main(poolsize, start_process):
pool = mp.Pool(processes=poolsize, initializer=start_process)
done = False
task_queue = []
while True:
... snippet code : do something ...
if len(task_queue) >= 10:
... code to expand pool goes here...
if done == True:
break
.. do final something ...
if __name__ == '__main__':
# freeze_support()
poolsize = 2
main(poolsize)
Upvotes: 2
Views: 293
Reputation: 3559
multiprocessing.Pool with dynamic size
full example based on the answer by ZF007
import multiprocessing, time, random
def worker_function(job_id):
dt = random.randint(1, 10)
print(f"job {job_id}: sleeping for {dt} seconds")
time.sleep(dt)
return job_id * job_id
def get_job_done(job_id):
return lambda val: print(f"job {job_id}: job done: val={val}")
def grow_pool(pool, new_size, max_size=None):
new_size = min(new_size, max_size) if max_size else new_size
if new_size > pool._processes:
print(f"growing pool from {pool._processes} to {new_size}")
pool._processes = new_size
pool._repopulate_pool()
if __name__ == "__main__":
# start pool
start_workers = 1 # start N workers before demand
max_workers = 4 # run N workers on demand
pool = multiprocessing.Pool(start_workers)
# add jobs
num_jobs = 10
grow_pool(pool, num_jobs, max_workers)
for job_id in range(0, num_jobs):
job_done = get_job_done(job_id)
print(f"job {job_id}: adding job")
pool.apply_async(worker_function, args=(job_id,), callback=job_done)
# wait
pool.close()
pool.join()
Upvotes: 0
Reputation: 3731
To add more workers during a running pool processing job you can add below function within the while-loop:
def repopulate(pool, add_workers):
current_pool_size = len(pool._pool) # _.pool gets the current pool size.
new_pool_size = current_pool_size + add_workers
pool._processes = new_pool_size
pool._repopulate_pool()
return pool
Within the while-loop from main()
:
if len(task_queue) >= 10:
new_workers = 2
repopulate(poolname, new_workers)
Upvotes: 1