Tony
Tony

Reputation: 764

How to implement QueueHandler and QueueListener inside flask factory

I have a flask application delivered by gunicorn that spawns multiple threads and processes from itself during the request. The problem is that when using the standard app.logger, some of the children get deadlocked because of the logging module not able to release the lock. This leads to these processes staying in memory indefinitely and becomes issue after time passes.

stack from py-spy

Thread 371832 (idle): "MainThread"
    flush (logging/__init__.py:1009)
    emit (logging/__init__.py:1029)
    emit (logging/__init__.py:1127)
    handle (logging/__init__.py:894)
    callHandlers (logging/__init__.py:1586)
    handle (logging/__init__.py:1524)
    _log (logging/__init__.py:1514)
    info (logging/__init__.py:1378)

As reference I used these awesome answers and questions

How should I log while using multiprocessing in Python?

Does python logging support multiprocessing?

I have this as factory

import logging
from logging import FileHandler, Formatter
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import Queue
from flask import Flask

log_queue = Queue()


def create_app(app_name="name", **kwargs):
    app = Flask(app_name)

    # Create a process-safe logging queue
    listener = setup_logging(app)
    listener.start()

    return app


def setup_logging(app: Flask):
    logger = app.logger
    logger.handlers = []

    logger.setLevel(logging.INFO)

    queue_handler = QueueHandler(log_queue)
    info_handler = FileHandler("info.log")
    info_handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s"))

    crit_handler = FileHandler("critical.log")
    crit_handler.setLevel(logging.CRITICAL)
    crit_handler.setFormatter(Formatter("CRIT\t%(message)s"))

    logger.addHandler(queue_handler)
    listener = QueueListener(
        log_queue, info_handler, crit_handler, respect_handler_level=True
    )

    return listener

The issue I am facing is this - each time I send a HUP to the master process to update my code and some env files I get this error

[2025-02-03 20:48:54 +0200] [51367] [INFO] Hang up: Master
[2025-02-03 20:48:54 +0200] [52751] [INFO] Booting worker with pid: 52751
[2025-02-03 20:48:54 +0200] [52676] [INFO] Worker exiting (pid: 52676)
[2025-02-03 20:48:54 +0200] [52673] [INFO] Worker exiting (pid: 52673)
[2025-02-03 20:48:54 +0200] [52756] [INFO] Booting worker with pid: 52756
Exception in thread Thread-1:
Traceback (most recent call last):
  File ".pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File ".pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File ".pyenv/versions/3.7.5/lib/python3.7/logging/handlers.py", line 1478, in _monitor
    record = self.dequeue(True)
  File ".pyenv/versions/3.7.5/lib/python3.7/logging/handlers.py", line 1427, in dequeue
    return self.queue.get(block)
  File ".pyenv/versions/3.7.5/lib/python3.7/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File ".pyenv/versions/3.7.5/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File ".pyenv/versions/3.7.5/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File ".pyenv/versions/3.7.5/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError

I really am trying to understand what is happening here. My guess is that the queue is not empty at the time of worker respawn and it gets killed. How should I solve this?

Upvotes: 0

Views: 46

Answers (0)

Related Questions