Reputation: 308
I am running a piece of python code in which multiple threads are run through threadpool executor. Each thread is supposed to perform a task (fetch a webpage for example). What I want to be able to do is to terminate all threads, even if one of the threads fail. For instance:
with ThreadPoolExecutor(self._num_threads) as executor:
jobs = []
for path in paths:
kw = {"path": path}
jobs.append(executor.submit(start,**kw))
for job in futures.as_completed(jobs):
result = job.result()
print(result)
def start(*args,**kwargs):
#fetch the page
if(success):
return True
else:
#Signal all threads to stop
Is it possible to do so? The results returned by threads are useless to me unless all of them are successful, so if even one of them fails, I would like to save some execution time of the rest of the threads and terminate them immediately. The actual code obviously is doing relatively lengthy tasks with a couple of failure points.
Upvotes: 10
Views: 14902
Reputation: 961
If you are done with threads and want to look into processes, then this piece of code here looks very promising and simple, almost the same syntax as threads, but with the multiprocessing module.
When the timeout flag expires the process is terminated, very convenient.
import multiprocessing
def get_page(*args, **kwargs):
# your web page downloading code goes here
def start_get_page(timeout, *args, **kwargs):
p = multiprocessing.Process(target=get_page, args=args, kwargs=kwargs)
p.start()
p.join(timeout)
if p.is_alive():
# stop the downloading 'thread'
p.terminate()
# and then do any post-error processing here
if __name__ == "__main__":
start_get_page(timeout, *args, **kwargs)
Upvotes: 6
Reputation: 720
In my code I used multiprocessing
import multiprocessing as mp
pool = mp.Pool()
for i in range(threadNumber):
pool.apply_async(publishMessage, args=(map_metrics, connection_parameters...,))
pool.close()
pool.terminate()
Upvotes: 0
Reputation: 32073
You can try to use StoppableThread
from func-timeout.
But terminating threads is strongly discouraged. And if you need to kill a thread, you probably have a design problem. Look at alternatives: asyncio
coroutines and multiprocessing
with legal cancel/terminating functionality.
Upvotes: -1
Reputation: 508
I have created an answer for a similar question I had, which I think will work for this question.
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
NUM_REQUESTS = 100
def long_request(id):
sleep(1)
# Simulate bad response
if id == 10:
return {"data": {"valid": False}}
else:
return {"data": {"valid": True}}
def check_results(results):
valid = True
for result in results:
valid = result["data"]["valid"]
return valid
def main():
futures = []
responses = []
num_requests = 0
with ThreadPoolExecutor(max_workers=10) as executor:
for request_index in range(NUM_REQUESTS):
future = executor.submit(long_request, request_index)
# Future list
futures.append(future)
for future in as_completed(futures):
is_responses_valid = check_results(responses)
# Cancel all future requests if one invalid
if not is_responses_valid:
executor.shutdown(wait=False)
else:
# Append valid responses
num_requests += 1
responses.append(future.result())
return num_requests
if __name__ == "__main__":
requests = main()
print("Num Requests: ", requests)
Upvotes: -1