Basavaraj Biradar
Basavaraj Biradar

Reputation: 23

Why are logs not appearing in a file/console when using logging in Python with multiprocessing.Pool?

I'm trying to implement logging in a Python application where I use both threads and processes. I have a custom LoggingManager that uses a QueueHandler to push log records from multiple processes to a shared log queue. When I use the main function without spawning new processes, logging works as expected (logs are written to both file and console). However, when I spawn multiple child processes using multiprocessing.Pool, the logs from these processes don't appear in the log file or console.

**logger_factory.py**
import logging
from logging import handlers
import multiprocessing as mp
import datetime
import os

class LoggingManager:
    _log_queue = None
    _listener_process = None

    @classmethod
    def get_log_queue(cls):
        """Retrieve or create the log queue."""
        if cls._log_queue is None:
            cls._log_queue = mp.Queue()
            cls._start_listener_process()
        return cls._log_queue

    @classmethod
    def _start_listener_process(cls):
        """Start listener process only once."""
        if cls._listener_process is None:
            cls._listener_process = mp.Process(target=cls._listener_process_target, args=(cls.get_log_queue(),))
            cls._listener_process.start()

    @classmethod
    def _listener_process_target(cls, log_queue):
        """Listener process responsible for consuming log records from the queue."""
        print("Listener process started.")
        cls.listener_configurer()

        while True:
            try:
                record = log_queue.get()  # Will block until a log record is available
                if record == "STOP":
                    print("Stopping listener process.")
                    break  # Exit the loop if we get a STOP signal

                logger = logging.getLogger(record.name)
                logger.handle(record)

            except Exception as e:
                print(f"Error in listener process: {e}")
                break

    @classmethod
    def listener_configurer(cls):
        """Configure the logging handlers (file, console, etc.)."""
        log_file_path = 'C:/Logs/test_{0}.log'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
        os.makedirs(os.path.dirname(log_file_path), exist_ok=True)

        root = logging.getLogger()

        # File handler to log messages with level INFO and higher (exclude DEBUG)
        file_handler = handlers.TimedRotatingFileHandler(
            log_file_path, when="midnight", interval=1, backupCount=5
        )
        file_handler.setLevel(logging.INFO)  # Set file handler level to INFO

        # Console handler to log all messages including DEBUG
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.DEBUG)

        # Log format
        formatter = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)

        # Add handlers to the root logger
        root.addHandler(file_handler)
        root.addHandler(console_handler)
        root.setLevel(logging.DEBUG)

    @classmethod
    def get_logger(cls, name):
        """Get the logger for the application."""
        logger = logging.getLogger(name)
        logger.setLevel(logging.DEBUG)

        # Add a QueueHandler to send logs to the main listener process
        queue_handler = handlers.QueueHandler(cls.get_log_queue())
        logger.addHandler(queue_handler)

        return logger

    @classmethod
    def stop_listener(cls):
        """Stop the listener process gracefully."""
        if cls._log_queue:
            cls._log_queue.put("STOP")
        if cls._listener_process:
            cls._listener_process.join()



##**automation.py**
import concurrent.futures
import multiprocessing as mp
import sys
import time
from logger_factory import LoggingManager

def thread_process_client_query_exec(log_q):
    """Simulate thread work and log messages."""
    logger = LoggingManager.get_logger("thread_process_client_query_exec")
    logger.info("Started thread for query execution.")
    print(f"Queue size in thread after logging: {log_q.qsize()}")
    time.sleep(1)  # Simulate work
    logger.info("Completed thread for query execution.")

def process_fun(log_q):
    """Simulate the work done by the main function in each process."""
    logger = LoggingManager.get_logger("process_fun")
    logger.info("Started process_fun.")
    print(f"Queue size in process_fun after logging: {log_q.qsize()}")
    sys.stdout.flush()
    future_to_query = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for query in range(5):
            res = executor.submit(thread_process_client_query_exec, log_q)
            future_to_query.append(res)

        for future in concurrent.futures.as_completed(future_to_query):
            try:
                future.result()
            except Exception as exc:
                logger.error(f"Exception in thread: {exc}")

def do_run_process_automation_test(log_q):
    """Simulate a multi-process test using multiprocessing."""
    logger = LoggingManager.get_logger(__name__)
    logger.info("Started do_run_automation_test.")
    
    with mp.Pool(2) as pool:
        results = []
        for i in range(5):
            p = pool.apply_async(process_fun, (log_q,logger,))
            results.append(p)

        pool.close()
        pool.join()  # Wait for all processes to complete
    logger.info("Completed do_run_automation_test.")


__main__.py
import time
from logger_factory import LoggingManager
import automation

def main():
    """Main function to initialize logging and run automation."""
    # Create log queue and configure logger with a log file
    log_q = LoggingManager.get_log_queue()

    # Get main logger and log some messages
    logger = LoggingManager.get_logger(__name__)
    logger.info("Logging from the main process")
    logger.debug("Debug message from main process")
    logger.error("Error message from main process")

    # Run the automation test
    automation.do_run_process_automation_test(log_q)

    # Log queue status for debugging purposes
    print(f"Queue size after all tasks: {log_q.qsize()}")

    # Wait for some time to ensure all logs are processed before shutdown
    time.sleep(2)

    # Stop the listener
    LoggingManager.stop_listener()

if __name__ == "__main__":
    main()

Upvotes: 0

Views: 39

Answers (1)

Suramuthu R
Suramuthu R

Reputation: 1898

Change get_log_queue() in such a way so that _start_listener_process() is called before returning the queue:

    @classmethod
    def get_log_queue(cls):
        """Retrieve or create the log queue."""
        if cls._log_queue is None:
            cls._log_queue = mp.Queue()

            # ✅ Start the listener process only if it's not already running
            if cls._listener_process is None or not cls._listener_process.is_alive():
                cls._start_listener_process()
        
        return cls._log_queue

Modify _listener_process_target to call listener_configurer() before entering the loop:

@classmethod
def _listener_process_target(cls, log_queue):
    """Listener process responsible for consuming log records from the queue."""
    print("Listener process started.")
    
    # Call listener_configurer before processing logs
    cls.listener_configurer()

    while True:
        try:
            record = log_queue.get()
            if record == "STOP":
                print("Stopping listener process.")
                break

            logger = logging.getLogger(record.name)
            logger.handle(record)

        except Exception as e:
            print(f"Error in listener process: {e}")
            break

Update do_run_process_automation_test to specifically pass log_q to the worker processes:

def do_run_process_automation_test(log_q):
    """Simulate a multi-process test using multiprocessing."""
    logger = LoggingManager.get_logger(__name__)
    logger.info("Started do_run_automation_test.")
    
    with mp.Pool(2) as pool:
        results = []
        for i in range(5):
            p = pool.apply_async(process_fun, (log_q,))
            results.append(p)

        pool.close()
        pool.join()
    
    logger.info("Completed do_run_automation_test.")

Update process_fun to specifically configure its logger:

def process_fun(log_q):
    """Simulate the work done by the main function in each process."""
    logger = LoggingManager.get_logger("process_fun")
    
    logger.info("Started process_fun.")
    print(f"Queue size in process_fun after logging: {log_q.qsize()}")
    sys.stdout.flush()
    
    future_to_query = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for query in range(5):
            res = executor.submit(thread_process_client_query_exec, log_q)
            future_to_query.append(res)

        for future in concurrent.futures.as_completed(future_to_query):
            try:
                future.result()
            except Exception as exc:
                logger.error(f"Exception in thread: {exc}")

Modify stop_listener to ensure a proper shutdown:

@classmethod
def stop_listener(cls):
    """Stop the listener process gracefully."""
    if cls._log_queue:
        cls._log_queue.put("STOP")
    if cls._listener_process:
        cls._listener_process.join()
        cls._listener_process = None  # Reset process after stopping

Edit:

  1. Prevent Recursive Calls in get_log_queue(). For this edited in the function get_log_queue(cls) in the class logger. Check my edited code.

  2. Ensure _start_listener_process() Runs Only Once. For this, Kindly edit this portion in your code

    if cls._listener_process is None or not cls._listener_process.is_alive():
        cls._listener_process = mp.Process(target=cls._listener_process_target, args=(cls._log_queue,))
        cls._listener_process.start()
    

Example Usage:

if __name__ == "__main__":
    log_queue = Logger.get_log_queue()
    log_queue.put("Test log message")
    log_queue.put("STOP")

Output expected:

Log: Test log message

Briefly: is_alive() is used to prevent redundant listener processes.

Upvotes: 0

Related Questions