KieranL
KieranL

Reputation: 181

Python multiprocessing pool: dynamically set number of processes during execution of tasks

We submit large CPU intensive jobs in Python 2.7 (that consist of many independent parallel processes) on our development machine which last for days at a time. The responsiveness of the machine slows down a lot when these jobs are running with a large number of processes. Ideally, I would like to limit the number of CPU available during the day when we're developing code and over night run as many processes as efficiently possible.

The Python multiprocessing library allows you to specify the number of process when you initiate a Pool. Is there a way to dynamically change this number each time a new task is initiated?

For instance, allow 20 processes to run during the hours 19-07 and 10 processes from hours 07-19.

One way would be to check the number of active processes using significant CPU. This is how I would like it to work:

from multiprocessing import Pool
import time 

pool = Pool(processes=20)

def big_task(x):
    while check_n_process(processes=10) is False:
        time.sleep(60*60)
    x += 1
    return x 


x = 1
multiple_results = [pool.apply_async(big_task, (x)) for i in range(1000)]
print([res.get() for res in multiple_results])

But I would need to write the 'check_n_process' function.

Any other ideas how this problem could be solved?

(The code needs to run in Python 2.7 - a bash implementation is not feasible).

Upvotes: 6

Views: 6178

Answers (2)

stolenmoment
stolenmoment

Reputation: 443

This is woefully incomplete (and an old question), but you can manage the load by keeping track of the running processes and only calling apply_async() when it's favorable; if each job runs for less than forever, you can drop the load by dispatching fewer jobs during working hours, or when os.getloadavg() is too high. I do this to manage network load when running multiple "scp"s to evade traffic shaping on our internal network (don't tell anyone!)

Upvotes: 0

noxdafox
noxdafox

Reputation: 15040

Python multiprocessing.Pool does not provide a way to change the amount of workers of a running Pool. A simple solution would be relying on third party tools.

The Pool provided by billiard used to provide such a feature.

Task queue frameworks like Celery or Luigi surely allow a flexible workload but are way more complex.

If the use of external dependencies is not feasible, you can try the following approach. Elaborating from this answer, you could set a throttling mechanism based on a Semaphore.

from threading import Semaphore, Lock
from multiprocessing import Pool

def TaskManager(object):
    def __init__(self, pool_size):
        self.pool = Pool(processes=pool_size)
        self.workers = Semaphore(pool_size)
        # ensures the semaphore is not replaced while used
        self.workers_mutex = Lock()  

    def change_pool_size(self, new_size):
        """Set the Pool to a new size."""
        with self.workers_mutex:  
            self.workers = Semaphore(new_size)

    def new_task(self, task):
        """Start a new task, blocks if queue is full."""
        with self.workers_mutex:
            self.workers.acquire()

        self.pool.apply_async(big_task, args=[task], callback=self.task_done))

    def task_done(self):
        """Called once task is done, releases the queue is blocked."""
        with self.workers_mutex:
            self.workers.release()

The pool would block further attempts to schedule your big_tasks if more than X workers are busy. By controlling this mechanism you could throttle the amount of processes running concurrently. Of course, this means that you give up the Pool queueing mechanism.

task_manager = TaskManager(20)

while True:
    if seven_in_the_morning():
        task_manager.change_pool_size(10)
    if seven_in_the_evening():
        task_manager.change_pool_size(20)

    task = get_new_task()
    task_manager.new_task()  # blocks here if all workers are busy

Upvotes: 4

Related Questions