Reputation: 23
I'm trying to implement logging in a Python application where I use both threads and processes. I have a custom LoggingManager that uses a QueueHandler to push log records from multiple processes to a shared log queue. When I use the main function without spawning new processes, logging works as expected (logs are written to both file and console). However, when I spawn multiple child processes using multiprocessing.Pool, the logs from these processes don't appear in the log file or console.
**logger_factory.py**
import logging
from logging import handlers
import multiprocessing as mp
import datetime
import os
class LoggingManager:
_log_queue = None
_listener_process = None
@classmethod
def get_log_queue(cls):
"""Retrieve or create the log queue."""
if cls._log_queue is None:
cls._log_queue = mp.Queue()
cls._start_listener_process()
return cls._log_queue
@classmethod
def _start_listener_process(cls):
"""Start listener process only once."""
if cls._listener_process is None:
cls._listener_process = mp.Process(target=cls._listener_process_target, args=(cls.get_log_queue(),))
cls._listener_process.start()
@classmethod
def _listener_process_target(cls, log_queue):
"""Listener process responsible for consuming log records from the queue."""
print("Listener process started.")
cls.listener_configurer()
while True:
try:
record = log_queue.get() # Will block until a log record is available
if record == "STOP":
print("Stopping listener process.")
break # Exit the loop if we get a STOP signal
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception as e:
print(f"Error in listener process: {e}")
break
@classmethod
def listener_configurer(cls):
"""Configure the logging handlers (file, console, etc.)."""
log_file_path = 'C:/Logs/test_{0}.log'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
root = logging.getLogger()
# File handler to log messages with level INFO and higher (exclude DEBUG)
file_handler = handlers.TimedRotatingFileHandler(
log_file_path, when="midnight", interval=1, backupCount=5
)
file_handler.setLevel(logging.INFO) # Set file handler level to INFO
# Console handler to log all messages including DEBUG
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# Log format
formatter = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add handlers to the root logger
root.addHandler(file_handler)
root.addHandler(console_handler)
root.setLevel(logging.DEBUG)
@classmethod
def get_logger(cls, name):
"""Get the logger for the application."""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Add a QueueHandler to send logs to the main listener process
queue_handler = handlers.QueueHandler(cls.get_log_queue())
logger.addHandler(queue_handler)
return logger
@classmethod
def stop_listener(cls):
"""Stop the listener process gracefully."""
if cls._log_queue:
cls._log_queue.put("STOP")
if cls._listener_process:
cls._listener_process.join()
##**automation.py**
import concurrent.futures
import multiprocessing as mp
import sys
import time
from logger_factory import LoggingManager
def thread_process_client_query_exec(log_q):
"""Simulate thread work and log messages."""
logger = LoggingManager.get_logger("thread_process_client_query_exec")
logger.info("Started thread for query execution.")
print(f"Queue size in thread after logging: {log_q.qsize()}")
time.sleep(1) # Simulate work
logger.info("Completed thread for query execution.")
def process_fun(log_q):
"""Simulate the work done by the main function in each process."""
logger = LoggingManager.get_logger("process_fun")
logger.info("Started process_fun.")
print(f"Queue size in process_fun after logging: {log_q.qsize()}")
sys.stdout.flush()
future_to_query = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for query in range(5):
res = executor.submit(thread_process_client_query_exec, log_q)
future_to_query.append(res)
for future in concurrent.futures.as_completed(future_to_query):
try:
future.result()
except Exception as exc:
logger.error(f"Exception in thread: {exc}")
def do_run_process_automation_test(log_q):
"""Simulate a multi-process test using multiprocessing."""
logger = LoggingManager.get_logger(__name__)
logger.info("Started do_run_automation_test.")
with mp.Pool(2) as pool:
results = []
for i in range(5):
p = pool.apply_async(process_fun, (log_q,logger,))
results.append(p)
pool.close()
pool.join() # Wait for all processes to complete
logger.info("Completed do_run_automation_test.")
__main__.py
import time
from logger_factory import LoggingManager
import automation
def main():
"""Main function to initialize logging and run automation."""
# Create log queue and configure logger with a log file
log_q = LoggingManager.get_log_queue()
# Get main logger and log some messages
logger = LoggingManager.get_logger(__name__)
logger.info("Logging from the main process")
logger.debug("Debug message from main process")
logger.error("Error message from main process")
# Run the automation test
automation.do_run_process_automation_test(log_q)
# Log queue status for debugging purposes
print(f"Queue size after all tasks: {log_q.qsize()}")
# Wait for some time to ensure all logs are processed before shutdown
time.sleep(2)
# Stop the listener
LoggingManager.stop_listener()
if __name__ == "__main__":
main()
Upvotes: 0
Views: 39
Reputation: 1898
Change get_log_queue()
in such a way so that _start_listener_process()
is called before returning the queue:
@classmethod
def get_log_queue(cls):
"""Retrieve or create the log queue."""
if cls._log_queue is None:
cls._log_queue = mp.Queue()
# ✅ Start the listener process only if it's not already running
if cls._listener_process is None or not cls._listener_process.is_alive():
cls._start_listener_process()
return cls._log_queue
Modify _listener_process_target
to call listener_configurer()
before entering the loop:
@classmethod
def _listener_process_target(cls, log_queue):
"""Listener process responsible for consuming log records from the queue."""
print("Listener process started.")
# Call listener_configurer before processing logs
cls.listener_configurer()
while True:
try:
record = log_queue.get()
if record == "STOP":
print("Stopping listener process.")
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception as e:
print(f"Error in listener process: {e}")
break
Update do_run_process_automation_test
to specifically pass log_q
to the worker processes:
def do_run_process_automation_test(log_q):
"""Simulate a multi-process test using multiprocessing."""
logger = LoggingManager.get_logger(__name__)
logger.info("Started do_run_automation_test.")
with mp.Pool(2) as pool:
results = []
for i in range(5):
p = pool.apply_async(process_fun, (log_q,))
results.append(p)
pool.close()
pool.join()
logger.info("Completed do_run_automation_test.")
Update process_fun to specifically configure its logger:
def process_fun(log_q):
"""Simulate the work done by the main function in each process."""
logger = LoggingManager.get_logger("process_fun")
logger.info("Started process_fun.")
print(f"Queue size in process_fun after logging: {log_q.qsize()}")
sys.stdout.flush()
future_to_query = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for query in range(5):
res = executor.submit(thread_process_client_query_exec, log_q)
future_to_query.append(res)
for future in concurrent.futures.as_completed(future_to_query):
try:
future.result()
except Exception as exc:
logger.error(f"Exception in thread: {exc}")
Modify stop_listener to ensure a proper shutdown:
@classmethod
def stop_listener(cls):
"""Stop the listener process gracefully."""
if cls._log_queue:
cls._log_queue.put("STOP")
if cls._listener_process:
cls._listener_process.join()
cls._listener_process = None # Reset process after stopping
Edit:
Prevent Recursive Calls in get_log_queue(). For this edited in the function get_log_queue(cls)
in the class logger
. Check my edited code.
Ensure _start_listener_process() Runs Only Once. For this, Kindly edit this portion in your code
if cls._listener_process is None or not cls._listener_process.is_alive():
cls._listener_process = mp.Process(target=cls._listener_process_target, args=(cls._log_queue,))
cls._listener_process.start()
Example Usage:
if __name__ == "__main__":
log_queue = Logger.get_log_queue()
log_queue.put("Test log message")
log_queue.put("STOP")
Output expected:
Log: Test log message
Briefly: is_alive()
is used to prevent redundant listener processes.
Upvotes: 0