Khizar Amin
Khizar Amin

Reputation: 308

Python ThreadPoolExecutor terminate all threads

I am running a piece of python code in which multiple threads are run through threadpool executor. Each thread is supposed to perform a task (fetch a webpage for example). What I want to be able to do is to terminate all threads, even if one of the threads fail. For instance:

with ThreadPoolExecutor(self._num_threads) as executor:
    jobs = []
    for path in paths:
        kw = {"path": path}
        jobs.append(executor.submit(start,**kw))
    for job in futures.as_completed(jobs):
        result = job.result()
        print(result)
def start(*args,**kwargs):
    #fetch the page
    if(success):
        return True
    else:
        #Signal all threads to stop

Is it possible to do so? The results returned by threads are useless to me unless all of them are successful, so if even one of them fails, I would like to save some execution time of the rest of the threads and terminate them immediately. The actual code obviously is doing relatively lengthy tasks with a couple of failure points.

Upvotes: 10

Views: 14902

Answers (4)

Diego Suarez
Diego Suarez

Reputation: 961

If you are done with threads and want to look into processes, then this piece of code here looks very promising and simple, almost the same syntax as threads, but with the multiprocessing module.

When the timeout flag expires the process is terminated, very convenient.

import multiprocessing

def get_page(*args, **kwargs):
    # your web page downloading code goes here

def start_get_page(timeout, *args, **kwargs):
    p = multiprocessing.Process(target=get_page, args=args, kwargs=kwargs)
    p.start()
    p.join(timeout)
    if p.is_alive():
        # stop the downloading 'thread'
        p.terminate()
        # and then do any post-error processing here

if __name__ == "__main__":
    start_get_page(timeout, *args, **kwargs)

Upvotes: 6

Ali Ait-Bachir
Ali Ait-Bachir

Reputation: 720

In my code I used multiprocessing

import multiprocessing as mp
pool = mp.Pool()
for i in range(threadNumber):
    pool.apply_async(publishMessage, args=(map_metrics, connection_parameters...,))

pool.close()
pool.terminate()

Upvotes: 0

alex_noname
alex_noname

Reputation: 32073

You can try to use StoppableThread from func-timeout. But terminating threads is strongly discouraged. And if you need to kill a thread, you probably have a design problem. Look at alternatives: asyncio coroutines and multiprocessing with legal cancel/terminating functionality.

Upvotes: -1

Pierre
Pierre

Reputation: 508

I have created an answer for a similar question I had, which I think will work for this question.

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

NUM_REQUESTS = 100


def long_request(id):
    sleep(1)

    # Simulate bad response
    if id == 10:
        return {"data": {"valid": False}}
    else:
        return {"data": {"valid": True}}


def check_results(results):
    valid = True
    for result in results:
        valid = result["data"]["valid"]

    return valid


def main():
    futures = []
    responses = []
    num_requests = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Future list
            futures.append(future)

        for future in as_completed(futures):

            is_responses_valid = check_results(responses)

            # Cancel all future requests if one invalid
            if not is_responses_valid:
                executor.shutdown(wait=False)
            else:
                # Append valid responses
                num_requests += 1
                responses.append(future.result())

    return num_requests


if __name__ == "__main__":
    requests = main()
    print("Num Requests: ", requests)

Upvotes: -1

Related Questions