Reputation: 798
I've read the documentation here, and seems that to make sure that the Value does not hang we need to use a lock. I did just that but it still gets stuck:
from multiprocessing import Process, Value, freeze_support, Lock
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(lock):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with lock:
v.value += 1
# wait for all the processes to finish doing something
while v.value % nb_threads != 0:
pass
if __name__ == '__main__':
freeze_support()
processes = []
lock = Lock()
for i in range(0, 3):
processes.append( Process( target=run_process, args=(lock,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
I've tried accessing the value using lock but it still blocks:
val = -1
while val % nb_threads != 0:
with lock:
val = v.value
How can I fix this? Thanks
Upvotes: 3
Views: 1589
Reputation: 94871
Your code has a race condition; you do not guarantee that all three processes break free from the while v.value % nb_threads != 0
loop before allowing them to move on. This allows one or two of the processes to move on to the next iteration of the while i < nbloops
loop, increment v.value
, and then prevent the remaining process/processes from ever breaking out of their own while v.value % nb_threads != 0
loop. The kind of synchronization you're trying to do there is best handled by a Barrier
, rather than looping and repeatedly checking the value.
Also, multiprocessing.Value
also has a built-in synchronization by default, and you can explicitly access the Lock
it uses for that by calling Value.get_lock
, so there is no need to explicitly a Lock
of your own to each process. Putting together, you have:
from multiprocessing import Process, Value, freeze_support, Lock, Barrier
nb_threads = 3
nbloops = 10
v = Value('i', 0)
def run_process(barrier):
global nbloops
i = 0
while i < nbloops:
# do stuff
i += 1
with v.get_lock():
v.value += 1
# wait for all the processes to finish doing something
out = barrier.wait()
if __name__ == '__main__':
freeze_support()
processes = []
b = Barrier(nb_threads)
for i in range(0, nb_threads):
processes.append( Process( target=run_process, args=(b,) ) )
for process in processes:
process.start()
for process in processes:
process.join()
The Barrier
guarantees that no process can move on to the next iteration of the loop until all of them have called Barrier.wait()
, at which point all three are simultaneously able to progress. The Barrier
object supports re-use, so it can safely be called on each iteration.
Upvotes: 3