Reputation: 10948
multiprocessing.Lock
(or RLock
) to multiprocessing.context.Process
fails with None value for the lockI tried many things over several hours. Nothing worked.
At first I had picking errors with the ForkingPickler. Then I saw some other SO answers where it was said that the locks needed to be passed as a target function's argument. I did so. For instance, I tried passing the lock as an argument like here: https://pymotw.com/2/multiprocessing/communication.html#controlling-access-to-resources
However, it seems that the lock is replaced by a None once I am into the target worker_function. The lock argument of the function was magically replaced by a None.
See below what I have tried.
See argument 2, shared_lock: Lock
:
from multiprocessing import Queue, Lock
from neuraxle.base import ExecutionContext
# ...
def worker_function(queue_worker: QueueWorker, shared_lock: Lock, context: ExecutionContext, use_savers: bool, additional_worker_arguments):
try:
context.flow._lock = shared_lock
# some more code after...
Here, I also have something similar to this, see the shared_lock
variable:
from multiprocessing import Lock, RLock
from multiprocessing.context import Process
from threading import Thread
use_processes = True # could be false as well.
# some more code before...
thread_safe_context = context.make_thread_safe()
shared_lock: Lock = context.flow._lock # trying to save the lock or rlock
context.flow._lock = None
parallel_call = Thread
if use_processes:
parallel_call = Process
p = parallel_call(
target=worker_function,
args=(thread_safe_self, shared_lock, thread_safe_context, self.use_savers, worker_arguments)
)
# some more code after...
I have done several other refactorings of the code to get to this point before simply illustrating the point above with context.flow._lock = None
. I am starting to get seriously irritated. I would like to finally find a solution to this.
Note that passing the Queue in a custom QueueWorker class works.
Hint: I've found that a Manager class or something like that could help, but I do not see how I could use this here, and I tried to pass the lock as indicated in the documentation (as linked above). Thank you for the help!
Upvotes: -1
Views: 177
Reputation: 10948
NOTE: This solution turned out to be bad and the issue is more complex than it seems.
Original answer:
It only had to do with how I created the lock passed as an argument.
Do:
from multiprocessing import Lock, Manager
lock = Manager().Lock()
Do not do:
from multiprocessing import Lock, Manager
lock = Lock()
EDIT: as mentioned by @user2357112, one should use multiprocessing.Process
instead of multiprocessing.context.Process
, or should use multiprocessing.get_context()
first. This seemed to be part of the bug.
EDIT 2: My above solution was wrong and was just a quite lucky workaround.
EDIT 3: Also the fix for me was to make sure the thing put in the queue was serializable before attempting to put it in the queue. Even exceptions passed to the queue need to have their stack trace serialized to string before passing. See: https://github.com/python/cpython/issues/79423
Upvotes: -1