ex1led
ex1led

Reputation: 469

In-order access to shared resource in python3 Multiprocessing

Given the MWE below:

import multiprocessing

def thread_function(shared_resource : dict, val : int) -> None: 

    if val not in shared_resource.keys(): 
        print(f"Value {val} is not in keys!")
        shared_resource[val] = 1

    shared_resource[val] += 1

def main():

    no_of_threads = 5
    manager       = multiprocessing.Manager()
    
    shared_resource = manager.dict()

    values = [1 , 1 , 2 , 1 , 3, 3, 3, 4, 5]

    with multiprocessing.Pool(no_of_threads) as pool:

        pool.starmap(thread_function,
                     [ (shared_resource, val) for val in values], 
                     chunksize=1)

    print(shared_resource)


if __name__ == "__main__": main()

I am having a readers/writers issue that I don't know how to resolve. The dictionary is a shared resource amongst the threads and I wish for the access to it to be atomic i.e., to avoid the scenario where two threads try to write the same value to it. For instance here is an incorrect output:

Value 1 is not in keys!
Value 1 is not in keys! # <-- Nope! 
Value 2 is not in keys!
Value 3 is not in keys!
Value 4 is not in keys!
Value 5 is not in keys!

On the other hand, there is the possibility that the threads will access the resource in the correct order and the output will be the correct one i.e.,

Value 1 is not in keys!
Value 2 is not in keys!
Value 3 is not in keys!
Value 4 is not in keys!
Value 5 is not in keys!
{1: 4, 2: 2, 3: 4, 4: 2, 5: 2}

But how can I avoid such scenarios and always have them behave as expected? Thank you in advance.

Upvotes: 1

Views: 357

Answers (1)

Booboo
Booboo

Reputation: 44128

I wouldn't call it exactly criminal, but I would call it malpractice.

Your code in thread_function constitutes a critical section whose execution needs to be serialized so that only a single process can be executing it at a time. Even what appears to be a single statement, shared_resource[val] += 1, consists of multiple bytecode instructions and two processes could read the same initial value of shared_resource[val] and store the same updated value. But clearly multiple processed running in parallel could clearly find that there are no keys in the dictionary and will be storing identical keys.

import multiprocessing

def init_processes(the_lock):
    global lock
    lock = the_lock

def thread_function(shared_resource : dict, val : int) -> None:

    with lock:
        if val not in shared_resource.keys():
            print(f"Value {val} is not in keys!")
            shared_resource[val] = 1

        shared_resource[val] += 1

def main():

    no_of_threads = 5
    manager       = multiprocessing.Manager()

    shared_resource = manager.dict()

    values = [1 , 1 , 2 , 1 , 3, 3, 3, 4, 5]

    lock = multiprocessing.Lock()
    with multiprocessing.Pool(no_of_threads, initializer=init_processes, initargs=(lock,)) as pool:

        pool.starmap(thread_function,
                     [ (shared_resource, val) for val in values],
                     chunksize=1)

    print(shared_resource)


if __name__ == "__main__": main()

Prints:

Value 1 is not in keys!
Value 2 is not in keys!
Value 3 is not in keys!
Value 5 is not in keys!
Value 4 is not in keys!
{1: 4, 2: 2, 3: 4, 5: 2, 4: 2}

NOTE: You have no control, however, over the dispatching of the tasks and therefore cannot control the order in which the keys will be stored.

Upvotes: 1

Related Questions