Reputation: 418
I'm writing a program which starts one thread to generate "work" and add it to a queue every N seconds. Then, I have a thread pool which processes items in the queue.
The program below works perfectly fine, until I comment out/delete line #97 (time.sleep(0.5)
in the main function). Once I do that, it generates a RuntimeError which attempting to gracefully stop the program (by sending a SIGINT or SIGTERM to the main process). It even works fine with an extremely small sleep like 0.1s, but has an issue with none at all.
I tried researching "reentrancy" but it went a bit over my head unfortunately.
Can anyone help me to understand this?
Code:
import random
import signal
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Empty, Queue, SimpleQueue
from typing import Any
class UniqueQueue:
"""
A thread safe queue which can only ever contain unique items.
"""
def __init__(self) -> None:
self._q = Queue()
self._items = []
self._l = threading.Lock()
def get(self, block: bool = False, timeout: float | None = None) -> Any:
with self._l:
try:
item = self._q.get(block=block, timeout=timeout)
except Empty:
raise
else:
self._items.pop(0)
return item
def put(self, item: Any, block: bool = False, timeout: float | None = None) -> None:
with self._l:
if item in self._items:
return None
self._items.append(item)
self._q.put(item, block=block, timeout=timeout)
def size(self) -> int:
return self._q.qsize()
def empty(self) -> bool:
return self._q.empty()
def stop_app(sig_num, sig_frame) -> None:
# global stop_app_event
print("Signal received to stop the app")
stop_app_event.set()
def work_generator(q: UniqueQueue) -> None:
last_execution = time.time()
is_first_execution = True
while not stop_app_event.is_set():
elapsed_seconds = int(time.time() - last_execution)
if elapsed_seconds <= 10 and not is_first_execution:
time.sleep(0.5)
continue
last_execution = time.time()
is_first_execution = False
print("Generating work...")
for _ in range(100):
q.put({"n": random.randint(0, 500)})
def print_work(w) -> None:
print(f"{datetime.now()}: {w}")
def main():
# Create a work queue
work_queue = UniqueQueue()
# Create a thread to generate the work and add to the queue
t = threading.Thread(target=work_generator, args=(work_queue,))
t.start()
# Create a thread pool, get work from the queue, and submit to the pool for processing
pool = ThreadPoolExecutor(max_workers=20)
futures: list[Future] = []
while True:
print("Processing work...")
if stop_app_event.is_set():
print("stop_app_event is set:", stop_app_event.is_set())
for future in futures:
future.cancel()
break
print("Queue Size:", work_queue.size())
try:
while not work_queue.empty():
work = work_queue.get()
future = pool.submit(print_work, work)
futures.append(future)
except Empty:
pass
time.sleep(0.5)
print("Stopping the work generator thread...")
t.join(timeout=10)
print("Work generator stopped")
print("Stopping the thread pool...")
pool.shutdown(wait=True)
print("Thread pool stopped")
if __name__ == "__main__":
stop_app_event = threading.Event()
signal.signal(signalnum=signal.SIGINT, handler=stop_app)
signal.signal(signalnum=signal.SIGTERM, handler=stop_app)
main()
Upvotes: 3
Views: 3483
Reputation: 4731
It's because you called the print()
in the signal handler, stop_app()
.
A signal handler is executed in a background thread in C, but in Python it is executed in the main thread. (See the reference.) In your case, while executing a print()
call, another print()
was called, and the term 'reentrant' fits perfectly here. And the current IO stack prohibits a reentrant call.(See the implementation if you are interested.)
You can remedy this by using the os.write()
and the sys.stdout
like the following.
import sys
import os
...
def stop_app(sig_num, sig_frame):
os.write(sys.stdout.fileno(), b"Signal received to stop the app\n")
stop_app_event.set()
Upvotes: 5