Paul Draper
Paul Draper

Reputation: 83263

How to interrupt Python I/O operations when threading?

For example,

with open("foo") as f:
  f.read()

(But it could be a file write, a DNS lookup, any number of other I/O operations.)

If I interrupt this program while reading (SIGINT), the I/O operation is halted and KeyboardInterrupt is thrown, and the finalizers run.

However, if this happens on a thread other than the main thread, the I/O operation is not interrupted.

So...how do I interrupt an I/O operation on another thread (similar to how it's interrupted on the main thread)?

Upvotes: 11

Views: 2761

Answers (3)

Arnab De
Arnab De

Reputation: 462

From https://docs.python.org/3/library/signal.html#signals-and-threads: "Python signal handlers are always executed in the main Python thread of the main interpreter, even if the signal was received in another thread. This means that signals can’t be used as a means of inter-thread communication. You can use the synchronization primitives from the threading module instead."

In this case, we can use 'Event Objects' from the threading module to synchronize threads.

import logging
import threading
import time
import signal


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    while not e.is_set():
        # event_is_set = e.wait(0.3)
        pass
    
    if e.is_set():
        logging.debug("Keyboard interrupt received from main thread")
        return
    

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

e = threading.Event()

def signal_handler(sig, frame):
    global e
    logging.debug("Keyboard interrupt receieved by means of Ctrl + C")
    e.set()


t1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(e,),
)
t1.start()

signal.signal(signal.SIGINT, signal_handler)
print("press Ctrl+C to stop")
signal.pause()

The result:

(block     ) wait_for_event starting
press Ctrl+C to stop
^C(MainThread) Keyboard interrupt receieved by means of Ctrl + C
(block     ) Keyboard interrupt received from main thread

Upvotes: 0

Chen A.
Chen A.

Reputation: 11280

Exceptions inside threads are propagated to the main thread. Here's an example

import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError


def func(raise_exc):
    print("Running in {}".format(threading.current_thread().name))
    if raise_exc:
        time.sleep(1)
        raise Exception

    time.sleep(3)


with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(func, False), executor.submit(func, True)]

    while len(futures) > 0:
        for fut in futures[:]:
            try:
                # check if thread has finished its work, with timeout
                result = fut.result(timeout=1)

            except TimeoutError as exc:
                print("Timeout.. retry in thread {}".format(threading.current_thread().name))
            except Exception as exc:
                print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
                # we remove this fut from the list, as it's finished
                futures.remove(fut)

            else:
                # op succeeded
                print("Thread finished successfully {}".format(threading.current_thread().name))
                futures.remove(fut)


print("Bye")

Which outputs

➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Timeout.. retry in thread MainThread
Thread finished successfully MainThread
Bye

But as you can see, exception in one thread isn't affecting other threads. If that's what you are after, you need to capture the signal in the main thread and send it to other active threads.

You can do so with a global variable to indicate if we are in a RUNNING state. Then, when exception propagates, we capture it and updating the RUNNING state. To signal other threads, we call shutdown on the threadpool object. This is how the looks:

import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError


def func(raise_exc):
    print("Running in {}".format(threading.current_thread().name))
    if raise_exc:
        time.sleep(1)
        raise Exception

    time.sleep(3)


RUNNING = True
LOCK = threading.Lock()

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(func, False), executor.submit(func, True)]

    while RUNNING:
        for fut in futures[:]:

            if not RUNNING:
                break

            try:
                # check if thread has finished its work, with timeout
                result = fut.result(timeout=1)

            except TimeoutError as exc:
                print("Timeout.. retry in thread {}".format(threading.current_thread().name))
            except Exception as exc:
                print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
                # we remove this fut from the list, as it's finished
                with LOCK:
                    print("Stop execution due to exception..")
                    RUNNING = False
                    executor.shutdown(wait=False)

            else:
                # op succeeded
                print("Thread finished successfully {}".format(threading.current_thread().name))
                futures.remove(fut)


print("Bye")

Which outputs

➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Stop execution due to exception..
Bye

Note that we protect the global with a lock, since more than one thread can access it at the same time.

Upvotes: 0

miquelvir
miquelvir

Reputation: 1757

Keyboard-interrupt events are always captured on the main thread, they do not directly impact other threads (in the sense that they won't be interrupted due to a Ctrl+C). src1 src2 (in a comment)

Here you have a sample example of a long IO bound operation, which gives us time to kill it before it finishes. KeyboardInterrupt works as you would expect.

import random
import threading


def long_io(file_name):
    with open(file_name, "w") as f:
        i = 0
        while i < 999999999999999999999999999:
            f.write(str(random.randint(0, 99999999999999999999999999)))
            i += 1


t = threading.Thread(target=long_io, args=("foo",), daemon=True)
t.start()

# keep the main thread alive, listening to possible KeyboardInterupts
while t.is_alive():
    t.join(1)  # try to join for 1 second, this gives a small window between joins in which the KeyboardInterrupt can rise

Notice that:

  • The thread is flagged as daemon; this way, on KeyboardInterrupt, the main thread will not wait until the IO is finished, but kill it. You could use non-daemonic threads (recommended) as explained here, but for this example killing them straight away suffices.

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property or the daemon constructor argument. Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event. src

make the child thread daemonic, which means that its parent (the main thread here) will kill it when it exits (only non-daemon threads are not killed but joined when their parent exits) src

  • We keep the main thread alive, so that it does not immediately finish but waits until the child thread finishes. Otherwise, the main thread (the one in charge of detecting Keyboard Interrupts) would be gone (killing the child thread if it is daemonic, or waiting for a join if it is not daemonic). We could have used a simple t.join() for that, but we did not. Why? Because the KeyboardInterrupt would also be impacted and it would only be raised after the join is completed.

the execution in the main thread remains blocked at the line thread.join(). src

Upvotes: 2

Related Questions