Reputation: 135
so I am doing an analysis where we process a lot of different files using multiprocessing.Pool in order to speed up the process:
with multiprocessing.Pool(processes=num_cores) as p:
output_mp = p.map(clean_file, doc_mp_list_xlsx)
Where read_file
is the function to read a file and doc_mp_list
is a list of the complete paths.
Logging is determined using the logging module, configured such that it tracks what CPU is used:
import logging # for logging
import multiprocessing # for getting cpu count
from utils import read_config # for reading config
config = read_config.config
num_cores = multiprocessing.cpu_count()
# CONFIGURE LOGGING FOR EACH POOL
for process in range(1, num_cores+1):
handler = logging.FileHandler(config['logging_path'] +
'\\' +
str(process) + '.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s*|*%(levelname)s*|*%(message)s*|*%(module)s*|*%(funcName)s*|*%(lineno)d*|*%(name)s'))
logger = logging.getLogger("SpawnPoolWorker-" + str(process))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
def getLogger(name):
"""
Function to return a logger for the current process
Arguments:
name: ignored argument for backwards compatibility
Returns:
logger: logger for current process
"""
return logging.getLogger(str(multiprocessing.current_process().name))
And this works correctly. However, logging creates duplicate values:
date and time | type | message | module | function | line | worker |
---|---|---|---|---|---|---|
2022-12-05 16:42:31,199* | INFO | *Beginning to clean file x.pdf * | clean_pdf | clean_pdf | 22 | *SpawnPoolWorker-3 |
2022-12-05 16:42:30,400* | INFO | *Beginning to clean file x.pdf * | clean_pdf | clean_pdf | 22 | *SpawnPoolWorker-4 |
I do know understand why this happens. Also, it does not create multiple outputs. It just seems that it is a duplicate message (but with a different worker attached to it).
Does anybody have a clue why this happens? Is the configuration of mp.logging incorrect? Or is it the result of something else?
Thanks in advance.
Upvotes: 0
Views: 553
Reputation: 4368
I think the problem comes from you misunderstanding how logging works in the case of multiprocessing, and in general.
Each process has its own Python, which includes its own logging
. In the main process, you configured it. But not in the others. Their own logging
never got configured. What you did instead was configuring the main process' logging
many times. Each time you provided a different name to a new handler, so that when a message was received in the main process it was handled many times, writing different names. But a log message emitted in the other processes did not get handled at all.
And you sometimes use logging.getLogger("SpawnPoolWorker-" + str(process))
, other times logging.getLogger(str(multiprocessing.current_process().name))
, so that you may not even use the same logger' objects.
I am not surprised it does not work. Multi-process code is harder than usually expected.
What you need to do is to include the setup of the logging in each process (for example at the start of clean_file
) instead, in a multiprocess-compatible way.
The code I used to reproduce (does not include the code for the solution) :
import logging
import multiprocessing
num_cores = multiprocessing.cpu_count()
for process in range(1, num_cores+1):
handler = logging.FileHandler('log.log') # modified the file path
handler.setFormatter(logging.Formatter(
'%(asctime)s*|*%(levelname)s*|*%(message)s*|*%(module)s*|*%(funcName)s*|*%(lineno)d*|*%(name)s'))
logger = logging.getLogger("SpawnPoolWorker-" + str(process))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
def getLogger():
"""
Function to return a logger for the current process
Arguments:
name: ignored argument for backwards compatibility
Returns:
logger: logger for current process
"""
return logging.getLogger(str(multiprocessing.current_process().name))
def clean_file(something): # FAKE
getLogger().debug(f"calling clean_file with {something=!r}")
doc_mp_list_xlsx = [1, 2, 3, 4, 5] # FAKE
with multiprocessing.Pool(processes=num_cores) as p:
output_mp = p.map(clean_file, doc_mp_list_xlsx)
Upvotes: 1