user302099
user302099

Reputation: 635

Timeout for each thread in ThreadPool in python

I am using Python 2.7.

I am currently using ThreadPoolExecuter like this:

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))

The problem is that f sometimes runs for too long. Whenever I run f, I want to limit its run to 100 seconds, and then kill it.

Eventually, for each element x in param, I would like to have an indication of whether or not f had to be killed, and in case it wasn't - what was the return value. Even if f times out for one parameter, I still want to run it with the next parameters.

The executer.map method does have a timeout parameter, but it sets a timeout for the entire run, from the time of the call to executer.map, and not for each thread separately.

What is the easiest way to get my desired behavior?

Upvotes: 6

Views: 11239

Answers (1)

Zags
Zags

Reputation: 41368

This answer is in terms of python's multiprocessing library, which is usually preferable to the threading library, unless your functions are just waiting on network calls. Note that the multiprocessing and threading libraries have the same interface.

Given you're processes run for potentially 100 seconds each, the overhead of creating a process for each one is fairly small in comparison. You probably have to make your own processes to get the necessary control.

One option is to wrap f in another function that will exectue for at most 100 seconds:

from multiprocessing import Pool

def timeout_f(arg):
    pool = Pool(processes=1)
    return pool.apply_async(f, [arg]).get(timeout=100)

Then your code changes to:

    result = list(executor.map(timeout_f, params))

Alternatively, you could write your own thread/process control:

from multiprocessing import Process
from time import time

def chunks(l, n):
    """ Yield successive n-sized chunks from l. """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
    for p in five_processes:
        p.start()
    time_waited = 0
    start = time()
    for p in five_processes:
        if time_waited >= 100:
            p.join(0)
            p.terminate()
        p.join(100 - time_waited)
        p.terminate()
        time_waited = time() - start
    for p in five_processes:
        exit_codes.append(p.exit_code)

You'd have to get the return values through something like Can I get a return value from multiprocessing.Process?

The exit codes of the processes are 0 if the processes completed and non-zero if they were terminated.

Techniques from: Join a group of python processes with a timeout, How do you split a list into evenly sized chunks?


As another option, you could just try to use apply_async on multiprocessing.Pool

from multiprocessing import Pool, TimeoutError
from time import sleep    

if __name__ == "__main__":
    pool = Pool(processes=5)
    processes = [pool.apply_async(f, [i]) for i in params]
    results = []
    for process in processes:
        try:
            result.append(process.get(timeout=100))
        except TimeoutError as e:
            results.append(e)

Note that the above possibly waits more than 100 seconds for each process, as if the first one takes 50 seconds to complete, the second process will have had 50 extra seconds in its run time. More complicated logic (such as the previous example) is needed to enforce stricter timeouts.

Upvotes: 10

Related Questions