Reputation: 26326
I'm using pyserial to acquire data with multiprocessing. The way I share data is very simple. So:
I have member objects in my class:
self.mpManager = mp.Manager()
self.shared_return_list = self.mpManager.list()
self.shared_result_lock = mp.Lock()
I call my multiprocessing process this way:
process = mp.Process(target=do_my_stuff,
args=(self.shared_stopped, self.shared_return_list, self.shared_result_lock)
)
where do_my_stuff
is a global function.
Now The part the fills the list in the process function:
if len(acqBuffer) > acquisitionSpecs["LengthToPass"]:
shared_lock.acquire()
shared_return_list.extend(acqBuffer)
del acqBuffer[:]
shared_lock.release()
And the part that takes that to the local thread for use is:
while len(self.acqBuffer) <= 0 and (not self.stopped):
#copy list from shared buffer and empty it
self.shared_result_lock.acquire()
self.acqBuffer.extend(self.shared_return_list)
del self.shared_return_list[:]
self.shared_result_lock.release()
The problem:
Although there's only 1 lock, my program is occasionally ending in a deadlock somehow! After waiting some time, my program freezes. After adding prints before and after the locks, I found that it freezes at a lock and reaches a deadlock somehow.
If I use a recursive lock, RLock()
, it works with no problems. Not sure whether I should do that.
How is this possible? Am I doing something wrong? I expect if both processes try to acquire the lock, they should block until the other process unlocks the lock.
Upvotes: 0
Views: 1427
Reputation: 26326
It turned out it's not a deadlock. My fault! The problem was that the data acquired from the device is sometimes so huge that copying the data through
shared_return_list.extend(acqBuffer)
del acqBuffer[:]
Takes a very long time that the program freezes. I solved this issue by moving data in chunks and by limiting the amount of data to be pulled from the device.
Upvotes: 1
Reputation: 29710
Without having a SSCCE, it's difficult to know if there's something else going on in your code or not.
One possibility is that there is an exception thrown after a lock is acquired. Try wrapping each of your locked sections in a try/finally clause. Eg.
try:
shared_lock.acquire()
shared_return_list.extend(acqBuffer)
del acqBuffer[:]
finally:
shared_lock.release()
and:
try:
self.shared_result_lock.acquire()
self.acqBuffer.extend(self.shared_return_list)
del self.shared_return_list[:]
finally:
self.shared_result_lock.release()
You could even add exception clauses, and log any exceptions raised, if this turns out to be the issue.
Upvotes: 1