Reputation: 508
I am trying to terminate a ThreadPool based on values returned from long running request. I wish to terminate the ThreadPool once the sum of the request return values reaches MIN_REQUIRED_VALUE
I am sure the problem is that I am creating a full list of futures which will always have to be resolved. I am not sure how to perform the requests without creating a list with ThreadPoolExecutor
I know there has been a couple of questions related to terminating a thread pool. I have found similar questions but the answers don't seem to handle the return value.
Smilar questions:
If there is a better way to do this with another module, that would be fine.
Any assistance would be much appreciated.
from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed
NUM_REQUESTS = 50
MIN_REQUIRED_VALUE = 30
def long_request(id):
sleep(3)
return {"data": {"value": 10}}
def check_results(results):
total = 0
for result in results:
total += result["data"]["value"]
return total
def main():
futures = []
responses = []
with ThreadPoolExecutor(max_workers=10) as executor:
for request_index in range(NUM_REQUESTS):
future = executor.submit(long_request, request_index)
# Create Futures List
futures.append(future)
for future in as_completed(futures):
responses.append(future.result())
# Check minimum value reached
total = check_results(responses)
if total > MIN_REQUIRED_VALUE:
executor.shutdown(wait=False)
if __name__ == "__main__":
main()
Upvotes: 1
Views: 942
Reputation: 508
I changed the code around to append only futures with results if MIN_REQUIRED_VALUE not reached and loop through all pending futures and cancel them if MIN_REQUIRED_VALUE is reached.
You can notice I added num_requests to check number of requests submitted and it turns out to be exactly 6 in this case which is expected.
If anyone has a better way to do this would be good to see.
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
NUM_REQUESTS = 1000
MIN_REQUIRED_VALUE = 50
def long_request(id):
sleep(1)
return {"data": {"value": 10}}
def check_results(results):
total = 0
for result in results:
total += result["data"]["value"]
return total
def main():
futures = []
responses = []
num_requests = 0
with ThreadPoolExecutor(max_workers=10) as executor:
for request_index in range(NUM_REQUESTS):
future = executor.submit(long_request, request_index)
# Future list
futures.append(future)
for future in as_completed(futures):
# --- Changed Logic Below ---
total = check_results(responses)
if total > MIN_REQUIRED_VALUE:
for pending_future in futures:
pending_future.cancel()
else:
num_requests += 1
responses.append(future.result())
return num_requests
if __name__ == "__main__":
requests = main()
print("Num Requests: ", requests)
Upvotes: 1