Reputation: 9877
I would like to synchronise access to a shared resource across multiple workers in a python multiprocessing.Pool
by passing a semaphore to a function. Here is some pseudocode.
def do_work(payload, semaphore):
with semaphore:
access_the_shared_resource(payload)
The function do_work
is defined in a library such that I cannot define a semaphore in my local scope that the function can inherit. I can also not pass a semaphore using functools.partial
because multiprocessing
attempts to pickle the semaphore which is not allowed. What seems to work is using a multiprocessing.Manager
to create a proxy to a Semaphore
:
manager = multiprocessing.Manager()
semaphore = manager.Semaphore()
with multiprocessing.Pool() as pool:
results = pool.map(functools.partial(do_work, semaphore=semaphore), payloads)
Is this the best approach or am I missing the obvious solution?
Upvotes: 4
Views: 2391
Reputation: 94881
Your only other option is to use initializer
and initargs
to pass a regular multiprocessing.Semaphore
to each worker process at pool creation time, and use it as a global variable:
semaphore = None
def do_work(payload):
with semaphore:
return payload
def init(sem):
global semaphore
semaphore = sem
if __name__ == "__main__":
sem = multiprocessing.Semaphore()
with multiprocessing.Pool(initializer=init, initargs=(sem,)) as p:
results = p.map(do_work, payloads)
The global semaphore
variable you define in the parent will be set to the multiprocessing.Semaphore()
in each child process.
Using the manager.Semaphore()
is fine, though it does require spawning an extra Python process.
Upvotes: 1