Sam Wood
Sam Wood

Reputation: 418

RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'>

I'm writing a program which starts one thread to generate "work" and add it to a queue every N seconds. Then, I have a thread pool which processes items in the queue.

The program below works perfectly fine, until I comment out/delete line #97 (time.sleep(0.5) in the main function). Once I do that, it generates a RuntimeError which attempting to gracefully stop the program (by sending a SIGINT or SIGTERM to the main process). It even works fine with an extremely small sleep like 0.1s, but has an issue with none at all.

I tried researching "reentrancy" but it went a bit over my head unfortunately.

Can anyone help me to understand this?

Code:

import random
import signal
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Empty, Queue, SimpleQueue
from typing import Any


class UniqueQueue:
    """
    A thread safe queue which can only ever contain unique items.
    """

    def __init__(self) -> None:
        self._q = Queue()
        self._items = []
        self._l = threading.Lock()

    def get(self, block: bool = False, timeout: float | None = None) -> Any:
        with self._l:
            try:
                item = self._q.get(block=block, timeout=timeout)
            except Empty:
                raise
            else:
                self._items.pop(0)
                return item

    def put(self, item: Any, block: bool = False, timeout: float | None = None) -> None:
        with self._l:
            if item in self._items:
                return None
            self._items.append(item)
            self._q.put(item, block=block, timeout=timeout)

    def size(self) -> int:
        return self._q.qsize()

    def empty(self) -> bool:
        return self._q.empty()


def stop_app(sig_num, sig_frame) -> None:
    # global stop_app_event
    print("Signal received to stop the app")
    stop_app_event.set()


def work_generator(q: UniqueQueue) -> None:
    last_execution = time.time()
    is_first_execution = True
    while not stop_app_event.is_set():
        elapsed_seconds = int(time.time() - last_execution)
        if elapsed_seconds <= 10 and not is_first_execution:
            time.sleep(0.5)
            continue
        last_execution = time.time()
        is_first_execution = False
        print("Generating work...")
        for _ in range(100):
            q.put({"n": random.randint(0, 500)})


def print_work(w) -> None:
    print(f"{datetime.now()}: {w}")


def main():
    # Create a work queue
    work_queue = UniqueQueue()

    # Create a thread to generate the work and add to the queue
    t = threading.Thread(target=work_generator, args=(work_queue,))
    t.start()

    # Create a thread pool, get work from the queue, and submit to the pool for processing
    pool = ThreadPoolExecutor(max_workers=20)
    futures: list[Future] = []
    while True:
        print("Processing work...")
        if stop_app_event.is_set():
            print("stop_app_event is set:", stop_app_event.is_set())
            for future in futures:
                future.cancel()
            break
        print("Queue Size:", work_queue.size())
        try:
            while not work_queue.empty():
                work = work_queue.get()
                future = pool.submit(print_work, work)
                futures.append(future)
        except Empty:
            pass
        time.sleep(0.5)

    print("Stopping the work generator thread...")
    t.join(timeout=10)
    print("Work generator stopped")
    print("Stopping the thread pool...")
    pool.shutdown(wait=True)
    print("Thread pool stopped")


if __name__ == "__main__":
    stop_app_event = threading.Event()
    signal.signal(signalnum=signal.SIGINT, handler=stop_app)
    signal.signal(signalnum=signal.SIGTERM, handler=stop_app)
    main()

Upvotes: 3

Views: 3483

Answers (1)

relent95
relent95

Reputation: 4731

It's because you called the print() in the signal handler, stop_app().

A signal handler is executed in a background thread in C, but in Python it is executed in the main thread. (See the reference.) In your case, while executing a print() call, another print() was called, and the term 'reentrant' fits perfectly here. And the current IO stack prohibits a reentrant call.(See the implementation if you are interested.)

You can remedy this by using the os.write() and the sys.stdout like the following.

import sys
import os
...
def stop_app(sig_num, sig_frame):
    os.write(sys.stdout.fileno(), b"Signal received to stop the app\n")
    stop_app_event.set()

Upvotes: 5

Related Questions