Reputation: 469
Given the MWE below:
import multiprocessing
def thread_function(shared_resource : dict, val : int) -> None:
if val not in shared_resource.keys():
print(f"Value {val} is not in keys!")
shared_resource[val] = 1
shared_resource[val] += 1
def main():
no_of_threads = 5
manager = multiprocessing.Manager()
shared_resource = manager.dict()
values = [1 , 1 , 2 , 1 , 3, 3, 3, 4, 5]
with multiprocessing.Pool(no_of_threads) as pool:
pool.starmap(thread_function,
[ (shared_resource, val) for val in values],
chunksize=1)
print(shared_resource)
if __name__ == "__main__": main()
I am having a readers/writers
issue that I don't know how to resolve. The dictionary is a shared resource amongst the threads and I wish for the access to it to be atomic i.e., to avoid the scenario where two threads try to write the same value to it. For instance here is an incorrect output:
Value 1 is not in keys!
Value 1 is not in keys! # <-- Nope!
Value 2 is not in keys!
Value 3 is not in keys!
Value 4 is not in keys!
Value 5 is not in keys!
On the other hand, there is the possibility that the threads will access the resource in the correct order and the output will be the correct one i.e.,
Value 1 is not in keys!
Value 2 is not in keys!
Value 3 is not in keys!
Value 4 is not in keys!
Value 5 is not in keys!
{1: 4, 2: 2, 3: 4, 4: 2, 5: 2}
But how can I avoid such scenarios and always have them behave as expected? Thank you in advance.
Upvotes: 1
Views: 357
Reputation: 44128
I wouldn't call it exactly criminal, but I would call it malpractice.
Your code in thread_function
constitutes a critical section whose execution needs to be serialized so that only a single process can be executing it at a time. Even what appears to be a single statement, shared_resource[val] += 1
, consists of multiple bytecode instructions and two processes could read the same initial value of shared_resource[val]
and store the same updated value. But clearly multiple processed running in parallel could clearly find that there are no keys in the dictionary and will be storing identical keys.
import multiprocessing
def init_processes(the_lock):
global lock
lock = the_lock
def thread_function(shared_resource : dict, val : int) -> None:
with lock:
if val not in shared_resource.keys():
print(f"Value {val} is not in keys!")
shared_resource[val] = 1
shared_resource[val] += 1
def main():
no_of_threads = 5
manager = multiprocessing.Manager()
shared_resource = manager.dict()
values = [1 , 1 , 2 , 1 , 3, 3, 3, 4, 5]
lock = multiprocessing.Lock()
with multiprocessing.Pool(no_of_threads, initializer=init_processes, initargs=(lock,)) as pool:
pool.starmap(thread_function,
[ (shared_resource, val) for val in values],
chunksize=1)
print(shared_resource)
if __name__ == "__main__": main()
Prints:
Value 1 is not in keys!
Value 2 is not in keys!
Value 3 is not in keys!
Value 5 is not in keys!
Value 4 is not in keys!
{1: 4, 2: 2, 3: 4, 5: 2, 4: 2}
NOTE: You have no control, however, over the dispatching of the tasks and therefore cannot control the order in which the keys will be stored.
Upvotes: 1