brp
brp

Reputation: 243

Python multiprocessing module: join processes with timeout

I'm doing an optimization of parameters of a complex simulation. I'm using the multiprocessing module for enhancing the performance of the optimization algorithm. The basics of multiprocessing I learned at http://pymotw.com/2/multiprocessing/basics.html. The complex simulation lasts different times depending on the given parameters from the optimization algorithm, around 1 to 5 minutes. If the parameters are chosen very badly, the simulation can last 30 minutes or more and the results are not useful. So I was thinking about build in a timeout to the multiprocessing, that terminates all simulations that last more than a defined time. Here is an abstracted version of the problem:

import numpy as np
import time
import multiprocessing

def worker(num):
    
    time.sleep(np.random.random()*20)

def main():
    
    pnum = 10    
    
    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)
        
    for p in procs:
        p.join(5)
        print('stopping', p.name)
     
if __name__ == "__main__":
    main()

The line p.join(5) defines the timeout of 5 seconds. Because of the for-loop for p in procs: the program waits 5 seconds until the first process is finished and then again 5 seconds until the second process is finished and so on, but i want the program to terminate all processes that last more than 5 seconds. Additionally, if none of the processes last longer than 5 seconds the program must not wait this 5 seconds.

Upvotes: 20

Views: 54180

Answers (4)

emunsing
emunsing

Reputation: 9954

I am working on a similar problem, managing long-running computations which may take a very long time, on a limited queue of workers. Rather than timing out the entire set of workers when the timeout is reached, I want each process to only last for timeout seconds from the time it is started. In addition, I built in the ability to automatically retry the task, up to n_attempts.

sim_specs_remaining = [(s, 0) for s in sim_specs]
queue = Queue()
processes = [
    start_new_process(*sim_specs_remaining.pop(0))
    for i in range(min(num_workers, len(sim_specs_remaining)))
]
results = []

while sim_specs_remaining or len(processes) > 0:
    if len(processes) < num_workers and len(sim_specs_remaining) > 0:
        spec, n_attempts = sim_specs_remaining.pop(0)
        if n_attempts < 3:
            processes.append(start_new_process(spec, n_attempts))

    for p in processes:
        start_time, process, spec, n_attempts = p
        if process.is_alive():
            if timeout is not None and time.time() - start_time > timeout:
                print(f"{spec.uid} exceeded the time limit and was terminated.")
                process.terminate()
                process.join()
                processes.remove(p)
                sim_specs_remaining.append((spec, n_attempts + 1))
            else:
                process.join(0.1)  # This reduces the cadence of the `while` loop
        else:
            try:
                result = queue.get(timeout=1)
                results.append(result)
            except TimeoutError:
                print("A task exceeded the time limit and was terminated.")
            finally:
                process.join()
                processes.remove(p)

Upvotes: 1

brp
brp

Reputation: 243

Thanks to the help of dano I found a solution:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    
    TIMEOUT = 5 
    procs = []
    bool_list = [True]*pnum

    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)

    start = time.time()
    while time.time() - start <= TIMEOUT:
        for i in range(pnum):
            bool_list[i] = procs[i].is_alive()
            
        print(bool_list)
            
        if np.any(bool_list):  
            time.sleep(.1)  
        else:
            break
    else:
        print("timed out, killing all processes")
        for p in procs:
            p.terminate()
            
    for p in procs:
        print('stopping', p.name,'=', p.is_alive())
        p.join()

if __name__ == "__main__":
    main()

Its not the most elegant way, I'm sure there is a better way than using bool_list. Processes that are still alive after the timeout of 5 seconds will be killed. If you are setting shorter times in the worker function than the timeout, you will see that the program stops before the timeout of 5 seconds is reached. I'm still open for more elegant solutions if there are :)

Upvotes: 4

Raiden Drake
Raiden Drake

Reputation: 179

If you want to kill all the processes you could use the Pool from multiprocessing you'll need to define a general timeout for all the execution as opposed of individual timeouts.

import numpy as np
import time
from multiprocessing import Pool

def worker(num):
    xtime = np.random.random()*20
    time.sleep(xtime)
    return xtime

def main():

    pnum = 10
    pool = Pool()
    args = range(pnum)
    pool_result = pool.map_async(worker, args)

    # wait 5 minutes for every worker to finish
    pool_result.wait(timeout=300)

    # once the timeout has finished we can try to get the results
    if pool_result.ready():
        print(pool_result.get(timeout=1))

if __name__ == "__main__":
    main()

This will get you a list with the return values for all your workers in order.
More information here: https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

Upvotes: 15

dano
dano

Reputation: 94961

You can do this by creating a loop that will wait for some timeout amount of seconds, frequently checking to see if all processes are finished. If they don't all finish in the allotted amount of time, then terminate all of the processes:

TIMEOUT = 5 
start = time.time()
while time.time() - start <= TIMEOUT:
    if not any(p.is_alive() for p in procs):
        # All the processes are done, break now.
        break

    time.sleep(.1)  # Just to avoid hogging the CPU
else:
    # We only enter this if we didn't 'break' above.
    print("timed out, killing all processes")
    for p in procs:
        p.terminate()
        p.join()

Upvotes: 19

Related Questions