VBobCat
VBobCat

Reputation: 2712

Interrupt ThreadPoolExecutor

How can I immediately stop a concurrent.futures.ThreadPoolExecutor and discard any pending operations when a script is interrupted, as in the example below:

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed


def get_parallel(arguments_list: list[dict]):
    try:
        with ThreadPoolExecutor(max_workers=25) as executor:
            futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]

            for future in as_completed(futures_buffer):
                try:
                    response = future.result()
                    print (response.url)
                    yield response.url, response.status_code, response.json()['args']
                except KeyboardInterrupt:
                    executor.shutdown(wait=False, cancel_futures=True)
                    yield 'KeyboardInterrupt 1'
                    return
                except Exception as exception:
                    yield exception
    except KeyboardInterrupt:
        yield 'KeyboardInterrupt 2'
        return


if __name__ == '__main__':
    arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
    for t in get_parallel(arguments):
        print(t)

As the code is now, when I run it from terminal and then call ^C, it will stop printing results but will hang for the same time as if it wasn't interrupted, and finally it will print KeyboardInterrupt 2.

Upvotes: 1

Views: 3292

Answers (3)

YouJiacheng
YouJiacheng

Reputation: 687

it will stop printing results but will hang for the same time as if it wasn't interrupted

The root cause is shown in the docs of Executor.shutdown(wait=True, *, cancel_futures=False). https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown

Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

Executor.__exit__ (called by with) will call self.shutdown(wait=True) without setting cancel_futures=True.

Thus, all pending futures that the executor has not started running won't be cancelled.

To get the expected behavior, you can use something like:

    executor = ThreadPoolExecutor(max_workers=num_workers)
    try:
        futures = [
            executor.submit(run_task, *task)
            for _, task in zip(range(num_tasks), task_generator())
        ]
        for future in as_completed(futures):
            print(future.result())
    finally:
        print("closing...")
        executor.shutdown(wait=True, cancel_futures=True)

Upvotes: 1

Jon-Eric
Jon-Eric

Reputation: 17275

It's possible to stop earlier with a little baby sitting. The trick is to keep the main thread in a polling loop, using wait(), to catch KeyboardInterrupt and cancel pending operations.

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

def get_parallel(arguments_list: list[dict]):
    with ThreadPoolExecutor(max_workers=25) as executor:
        futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]

        not_done = futures_buffer
        try:
            while not_done:
                done, not_done = wait(not_done, timeout=1, return_when=FIRST_COMPLETED)
                for future in done:
                    response = future.result()
                    print (response.url)
                    yield response.url, response.status_code, response.json()['args']

        # Cancel any futures not done on KeyboardInterrupt
        except KeyboardInterrupt:
            executor.shutdown(wait=False, cancel_futures=True)
            yield 'KeyboardInterrupt 1' # Or re-raise if not in generator

Upvotes: 2

tdelaney
tdelaney

Reputation: 77347

You can't signal individual threads in python and even if you could, there's no guarantee that requests didn't create its own threads as workers. You can kill processes, so you could delegate the request to a subprocess and kill it when you get a keyboard interrupt. The cleanest way to do this is to manage your own subprocesses and a queue of work items. But, if you don't mind a little hacking, concurrent.futures.ProcessPoolExecutor keeps a list of its pool processes and you can hijack that. But its hacking ... it may break some time in the future. Since the response is pickled and sent back to the parent process, its best to have an intermediate worker function that grabs useful data from the response object, rather than the response object itself.

import concurrent.futures
import os
import requests
import signal

def worker(arguments_list: dict):
    """use requests to get web page and return url, status, json args"""
    resp = requests.get(**arguments_list)
    # todo: when status_code not 200 and json decode fails, do...???
    return resp.url, resp.status_code, resp.json()['args']

def get_parallel(arguments_list: list[dict]):
    with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
        try:
            futures_buffer = [executor.submit(worker, kwargs) for kwargs in arguments_list]

            for future in concurrent.futures.as_completed(futures_buffer):
                    url, status_code, args = future.result()
                    print (url)
                    yield url, status_code, args
        except KeyboardInterrupt:
            for pid in executor._processes:
                os.kill(pid, signal.SIGKILL)
            yield 'KeyboardInterrupt 2'

if __name__ == '__main__':
    arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
    for t in get_parallel(arguments):
        print(t)

Upvotes: 1

Related Questions