Reputation: 83263
For example,
with open("foo") as f:
f.read()
(But it could be a file write, a DNS lookup, any number of other I/O operations.)
If I interrupt this program while reading (SIGINT), the I/O operation is halted and KeyboardInterrupt
is thrown, and the finalizers run.
However, if this happens on a thread other than the main thread, the I/O operation is not interrupted.
So...how do I interrupt an I/O operation on another thread (similar to how it's interrupted on the main thread)?
Upvotes: 11
Views: 2761
Reputation: 462
From https://docs.python.org/3/library/signal.html#signals-and-threads: "Python signal handlers are always executed in the main Python thread of the main interpreter, even if the signal was received in another thread. This means that signals can’t be used as a means of inter-thread communication. You can use the synchronization primitives from the threading module instead."
In this case, we can use 'Event Objects' from the threading
module to synchronize threads.
import logging
import threading
import time
import signal
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
while not e.is_set():
# event_is_set = e.wait(0.3)
pass
if e.is_set():
logging.debug("Keyboard interrupt received from main thread")
return
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
e = threading.Event()
def signal_handler(sig, frame):
global e
logging.debug("Keyboard interrupt receieved by means of Ctrl + C")
e.set()
t1 = threading.Thread(
name='block',
target=wait_for_event,
args=(e,),
)
t1.start()
signal.signal(signal.SIGINT, signal_handler)
print("press Ctrl+C to stop")
signal.pause()
The result:
(block ) wait_for_event starting
press Ctrl+C to stop
^C(MainThread) Keyboard interrupt receieved by means of Ctrl + C
(block ) Keyboard interrupt received from main thread
Upvotes: 0
Reputation: 11280
Exceptions inside threads are propagated to the main thread. Here's an example
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def func(raise_exc):
print("Running in {}".format(threading.current_thread().name))
if raise_exc:
time.sleep(1)
raise Exception
time.sleep(3)
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(func, False), executor.submit(func, True)]
while len(futures) > 0:
for fut in futures[:]:
try:
# check if thread has finished its work, with timeout
result = fut.result(timeout=1)
except TimeoutError as exc:
print("Timeout.. retry in thread {}".format(threading.current_thread().name))
except Exception as exc:
print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
# we remove this fut from the list, as it's finished
futures.remove(fut)
else:
# op succeeded
print("Thread finished successfully {}".format(threading.current_thread().name))
futures.remove(fut)
print("Bye")
Which outputs
➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Timeout.. retry in thread MainThread
Thread finished successfully MainThread
Bye
But as you can see, exception in one thread isn't affecting other threads. If that's what you are after, you need to capture the signal in the main thread and send it to other active threads.
You can do so with a global variable to indicate if we are in a RUNNING state. Then, when exception propagates, we capture it and updating the RUNNING state. To signal other threads, we call shutdown on the threadpool object. This is how the looks:
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def func(raise_exc):
print("Running in {}".format(threading.current_thread().name))
if raise_exc:
time.sleep(1)
raise Exception
time.sleep(3)
RUNNING = True
LOCK = threading.Lock()
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(func, False), executor.submit(func, True)]
while RUNNING:
for fut in futures[:]:
if not RUNNING:
break
try:
# check if thread has finished its work, with timeout
result = fut.result(timeout=1)
except TimeoutError as exc:
print("Timeout.. retry in thread {}".format(threading.current_thread().name))
except Exception as exc:
print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
# we remove this fut from the list, as it's finished
with LOCK:
print("Stop execution due to exception..")
RUNNING = False
executor.shutdown(wait=False)
else:
# op succeeded
print("Thread finished successfully {}".format(threading.current_thread().name))
futures.remove(fut)
print("Bye")
Which outputs
➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Stop execution due to exception..
Bye
Note that we protect the global with a lock, since more than one thread can access it at the same time.
Upvotes: 0
Reputation: 1757
Keyboard-interrupt events are always captured on the main thread, they do not directly impact other threads (in the sense that they won't be interrupted due to a Ctrl+C
). src1 src2 (in a comment)
Here you have a sample example of a long IO bound operation, which gives us time to kill it before it finishes. KeyboardInterrupt works as you would expect.
import random
import threading
def long_io(file_name):
with open(file_name, "w") as f:
i = 0
while i < 999999999999999999999999999:
f.write(str(random.randint(0, 99999999999999999999999999)))
i += 1
t = threading.Thread(target=long_io, args=("foo",), daemon=True)
t.start()
# keep the main thread alive, listening to possible KeyboardInterupts
while t.is_alive():
t.join(1) # try to join for 1 second, this gives a small window between joins in which the KeyboardInterrupt can rise
Notice that:
daemon
; this way, on KeyboardInterrupt, the main thread will not wait until the IO is finished, but kill it. You could use non-daemonic threads (recommended) as explained here, but for this example killing them straight away suffices.A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property or the daemon constructor argument. Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event. src
make the child thread daemonic, which means that its parent (the main thread here) will kill it when it exits (only non-daemon threads are not killed but joined when their parent exits) src
t.join()
for that, but we did not. Why? Because the KeyboardInterrupt
would also be impacted and it would only be raised after the join is completed.the execution in the main thread remains blocked at the line thread.join(). src
Upvotes: 2