Reputation: 3483
I'm using Python 2.7's multiprocessing.Pool
to manage a pool of 3 workers. Each worker is fairly complicated and there's a resource leak (presumably) in some third-party code that causes problems after 6-8 hours of continuous runtime. So I'd like to use maxtasksperchild
to have workers refreshed periodically.
I'd also like each worker to write to its own separate log file. Without maxtasksperchild
I use a shared multiprocessing.Value
to assign an integer (0, 1, or 2) to each worker, then use the integer to name the log file.
With maxtasksperchild
I'd like to reuse log files once a worker is done. So if this whole thing runs for a month, I only want three log files, not one log file for each worker that was spawned.
If I could pass a callback (e.g. a finalizer
to go along with the initializer
currently supported), this would be straightforward. Without that, I can't see a robust and simple way to do it.
Upvotes: 1
Views: 1454
Reputation: 21694
That's AFAIK undocumented, but multiprocessing
has a Finalizer
class, "which supports object finalization using weakrefs". You could use it to register a finalizer within your initializer
.
I don't see multiprocessing.Value
a helpful synchronization choice in this case, though. Multiple workers could exit simultaneously, signaling which file-integers are free is more than a (locked) counter could provide then.
I would suggest use of multiple bare multiprocessing.Lock
s, one for each file, instead:
from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize
def f(n):
global fileno
for _ in range(int(n)): # xrange for Python 2
pass
return fileno
def init_fileno(file_locks):
for i, lock in enumerate(file_locks):
if lock.acquire(False): # non-blocking attempt
globals()['fileno'] = i
print("{} using fileno: {}".format(current_process().name, i))
Finalize(lock, lock.release, exitpriority=15)
break
if __name__ == '__main__':
n_proc = 3
file_locks = [Lock() for _ in range(n_proc)]
pool = Pool(
n_proc, initializer=init_fileno, initargs=(file_locks,),
maxtasksperchild=2
)
print(pool.map(func=f, iterable=[50e6] * 18))
pool.close()
pool.join()
# all locks should be available if all finalizers did run
assert all(lock.acquire(False) for lock in file_locks)
Output:
ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]
Process finished with exit code 0
Note that with Python 3 you can't use Pool's context-manager reliably instead of the old way of doing it shown above. Pool's context-manager (unfortunately) calls terminate()
, which might kill worker-processes before the finalizer had a chance to run.
Upvotes: 2
Reputation: 3483
I ended up going with the following. It assumes that PIDs aren't recycled very quickly (true on Ubuntu for me, but not in general on Unix). I don't think it makes any other assumptions, but I'm really just interested in Ubuntu so I didn't look at other platforms such as Windows carefully.
The code use an array to keep track of which PIDs have claimed which index. Then when a new worker is started, it looks to see if any PIDs are no longer in use. If it finds one, it assumes this is because the worker has completed its work (or been terminated for another reason). If it doesn't find one then we're out of luck! So this isn't perfect but I think its simpler than anything I've seen so far or considered.
def run_pool():
child_pids = Array('i', 3)
pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)
def init_worker(child_pids):
with child_pids.get_lock():
available_index = None
for index, pid in enumerate(child_pids):
# PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
# which are no longer in use. We also reclaim the lucky case where a PID was recycled
# but assigned to one of our workers again, so we know we can take it over
if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
available_index = index
break
if available_index is not None:
child_pids[available_index] = os.getpid()
else:
# This is unexpected - it means all of the PIDs are in use so we have a logical error
# or a PID was recycled before we could notice and reclaim its index
pass
def _is_pid_in_use(pid):
try:
os.kill(pid, 0)
return True
except OSError:
return False
Upvotes: 0