Reputation: 2712
How can I immediately stop a concurrent.futures.ThreadPoolExecutor
and discard any pending operations when a script is interrupted, as in the example below:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_parallel(arguments_list: list[dict]):
try:
with ThreadPoolExecutor(max_workers=25) as executor:
futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]
for future in as_completed(futures_buffer):
try:
response = future.result()
print (response.url)
yield response.url, response.status_code, response.json()['args']
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
yield 'KeyboardInterrupt 1'
return
except Exception as exception:
yield exception
except KeyboardInterrupt:
yield 'KeyboardInterrupt 2'
return
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)
As the code is now, when I run it from terminal and then call ^C, it will stop printing results but will hang for the same time as if it wasn't interrupted, and finally it will print KeyboardInterrupt 2
.
Upvotes: 1
Views: 3292
Reputation: 687
it will stop printing results but will hang for the same time as if it wasn't interrupted
The root cause is shown in the docs of Executor.shutdown(wait=True, *, cancel_futures=False)
.
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
Executor.__exit__
(called by with
) will call self.shutdown(wait=True)
without setting cancel_futures=True
.
Thus, all pending futures that the executor has not started running won't be cancelled.
To get the expected behavior, you can use something like:
executor = ThreadPoolExecutor(max_workers=num_workers)
try:
futures = [
executor.submit(run_task, *task)
for _, task in zip(range(num_tasks), task_generator())
]
for future in as_completed(futures):
print(future.result())
finally:
print("closing...")
executor.shutdown(wait=True, cancel_futures=True)
Upvotes: 1
Reputation: 17275
It's possible to stop earlier with a little baby sitting. The trick is to keep the main thread in a polling loop, using wait()
, to catch KeyboardInterrupt
and cancel pending operations.
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
def get_parallel(arguments_list: list[dict]):
with ThreadPoolExecutor(max_workers=25) as executor:
futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]
not_done = futures_buffer
try:
while not_done:
done, not_done = wait(not_done, timeout=1, return_when=FIRST_COMPLETED)
for future in done:
response = future.result()
print (response.url)
yield response.url, response.status_code, response.json()['args']
# Cancel any futures not done on KeyboardInterrupt
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
yield 'KeyboardInterrupt 1' # Or re-raise if not in generator
Upvotes: 2
Reputation: 77347
You can't signal individual threads in python and even if you could, there's no guarantee that requests didn't create its own threads as workers. You can kill processes, so you could delegate the request to a subprocess and kill it when you get a keyboard interrupt. The cleanest way to do this is to manage your own subprocesses and a queue of work items. But, if you don't mind a little hacking, concurrent.futures.ProcessPoolExecutor
keeps a list of its pool processes and you can hijack that. But its hacking ... it may break some time in the future.
Since the response is pickled and sent back to the parent process, its best to have an intermediate worker function that grabs useful data from the response object, rather than the response object itself.
import concurrent.futures
import os
import requests
import signal
def worker(arguments_list: dict):
"""use requests to get web page and return url, status, json args"""
resp = requests.get(**arguments_list)
# todo: when status_code not 200 and json decode fails, do...???
return resp.url, resp.status_code, resp.json()['args']
def get_parallel(arguments_list: list[dict]):
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
try:
futures_buffer = [executor.submit(worker, kwargs) for kwargs in arguments_list]
for future in concurrent.futures.as_completed(futures_buffer):
url, status_code, args = future.result()
print (url)
yield url, status_code, args
except KeyboardInterrupt:
for pid in executor._processes:
os.kill(pid, signal.SIGKILL)
yield 'KeyboardInterrupt 2'
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)
Upvotes: 1