Mathieu
Mathieu

Reputation: 5756

Python dynamic control of the number of processes in a multiprocessing script according to the amount of free RAM and of a parameter from the function

I've got an unusual question for python. I'm using the multiprocessing library to map a function f((dynamic1, dynamic2), fix1, fix2).

import multiprocessing as mp

fix1 = 4
fix2 = 6

# Numer of cores to use
N = 6

dynamic_duos = [(a, b) for a in range(5) for b in range(10)]

with mp.Pool(processes = N) as p:
    p.starmap(f, [(dyn, fix1, fix2) for dyn in dynamic_duos])

I would like to control dynamically the number of active processes because the function is actually pumping sometimes a LOT of RAM. The idea would be to check at every iteration (i.e. before any call of the function f) if the sum(dyn) is inferior to a threshold and if the amount of RAM is above a threshold. If the conditions are matched, then a new process can start and compute the function.

An additional condition would be the maximum number of processes: the number of cores on the PC.

Thanks for the help :)

Edit: Details on the reasons.

Some of the combinations of parameters will have a high RAM consumption (up to 80 Gb on 1 process). I know more or less which ones will use a lot of RAM, and when the program encounters them, I would like to wait for the other process to end, start in single process this high RAM consumption combination, and then resume the computation with more processes on the rest of the combination to map.

Edit on my try based on the answer below:

It doesn't work, but it doesn't raise an error. It just completes the program.

# Imports
import itertools
import concurrent.futures

# Parameters
N = int(input("Number of CPUs to use: "))
t0 = 0
tf = 200
s_step = 0.05
max_s = None
folder = "test"

possible_dynamics = [My_class(x) for x in [20, 30, 40, 50, 60]]
dynamics_to_compute = [list(x) for x in itertools.combinations_with_replacement(possible_dynamics , 2)] + [list(x) for x in itertools.combinations_with_replacement(possible_dynamics , 3)]

function_inputs = [(dyn , t0, tf, s_step, max_s, folder) for dyn in dynamics_to_compute]

# -----------
# Computation
# -----------
start = time.time()

# Pool creation and computation
futures = []
pool = concurrent.futures.ProcessPoolExecutor(max_workers = N)

for Obj, t0, tf, s_step, max_s, folder in function_inputs:
    if large_memory(Obj, s_step, max_s):
        concurrent.futures.wait(futures)  # wait for all pending tasks
        large_future = pool.submit(compute, Obj, t0, tf, 
                             s_step, max_s, folder)
        large_future.result()  # wait for large computation to finish
    else:
        future = pool.submit(compute, Obj, t0, tf, 
                             s_step, max_s, folder)
        futures.append(future)

end = time.time()
if round(end-start, 3) < 60:
    print ("Complete - Elapsed time: {} s".format(round(end-start,3)))
else:
    print ("Complete - Elapsed time: {} mn and {} s".format(int((end-start)//60), round((end-start)%60,3)))

os.system("pause")

This is still a simplified example of my code, but the idea is here. It runs in less than 0.2 s, which means he actually never called the function compute.

N.B: Obj is not the actual variable name.

Upvotes: 3

Views: 392

Answers (1)

noxdafox
noxdafox

Reputation: 15050

To achieve so you need to give up on the use of map to gain more control on the execution flow of your tasks.

This code implements the algorithm you described at the end of your question. I'd recommend using concurrent.futures library as it expose a more neat set of APIs.

import concurrent.futures

pool = concurrent.futures.ProcessPoolExecutor(max_workers=6)

futures = []

for dyn, fix1, fix2 in dynamic_duos:
    if large_memory(dyn, fix1, fix2):
        concurrent.futures.wait(futures)  # wait for all pending tasks
        large_future = pool.submit(f, dyn, fix1, fix2)
        large_future.result()  # wait for large computation to finish
    else:
        future = pool.submit(f, dyn, fix1, fix2)
        futures.append(future)

Upvotes: 1

Related Questions