Reputation: 181
We submit large CPU intensive jobs in Python 2.7 (that consist of many independent parallel processes) on our development machine which last for days at a time. The responsiveness of the machine slows down a lot when these jobs are running with a large number of processes. Ideally, I would like to limit the number of CPU available during the day when we're developing code and over night run as many processes as efficiently possible.
The Python multiprocessing library allows you to specify the number of process when you initiate a Pool. Is there a way to dynamically change this number each time a new task is initiated?
For instance, allow 20 processes to run during the hours 19-07 and 10 processes from hours 07-19.
One way would be to check the number of active processes using significant CPU. This is how I would like it to work:
from multiprocessing import Pool
import time
pool = Pool(processes=20)
def big_task(x):
while check_n_process(processes=10) is False:
time.sleep(60*60)
x += 1
return x
x = 1
multiple_results = [pool.apply_async(big_task, (x)) for i in range(1000)]
print([res.get() for res in multiple_results])
But I would need to write the 'check_n_process' function.
Any other ideas how this problem could be solved?
(The code needs to run in Python 2.7 - a bash implementation is not feasible).
Upvotes: 6
Views: 6178
Reputation: 443
This is woefully incomplete (and an old question), but you can manage the load by keeping track of the running processes and only calling apply_async() when it's favorable; if each job runs for less than forever, you can drop the load by dispatching fewer jobs during working hours, or when os.getloadavg() is too high. I do this to manage network load when running multiple "scp"s to evade traffic shaping on our internal network (don't tell anyone!)
Upvotes: 0
Reputation: 15040
Python multiprocessing.Pool
does not provide a way to change the amount of workers of a running Pool
. A simple solution would be relying on third party tools.
The Pool provided by billiard
used to provide such a feature.
Task queue frameworks like Celery
or Luigi
surely allow a flexible workload but are way more complex.
If the use of external dependencies is not feasible, you can try the following approach. Elaborating from this answer, you could set a throttling mechanism based on a Semaphore.
from threading import Semaphore, Lock
from multiprocessing import Pool
def TaskManager(object):
def __init__(self, pool_size):
self.pool = Pool(processes=pool_size)
self.workers = Semaphore(pool_size)
# ensures the semaphore is not replaced while used
self.workers_mutex = Lock()
def change_pool_size(self, new_size):
"""Set the Pool to a new size."""
with self.workers_mutex:
self.workers = Semaphore(new_size)
def new_task(self, task):
"""Start a new task, blocks if queue is full."""
with self.workers_mutex:
self.workers.acquire()
self.pool.apply_async(big_task, args=[task], callback=self.task_done))
def task_done(self):
"""Called once task is done, releases the queue is blocked."""
with self.workers_mutex:
self.workers.release()
The pool would block further attempts to schedule your big_tasks
if more than X workers are busy. By controlling this mechanism you could throttle the amount of processes running concurrently. Of course, this means that you give up the Pool
queueing mechanism.
task_manager = TaskManager(20)
while True:
if seven_in_the_morning():
task_manager.change_pool_size(10)
if seven_in_the_evening():
task_manager.change_pool_size(20)
task = get_new_task()
task_manager.new_task() # blocks here if all workers are busy
Upvotes: 4