pete
pete

Reputation: 61

Dictionary multiprocessing

I want to parallelize the processing of a dictionary using the multiprocessing library.

My problem can be reduced to this code:

from multiprocessing import Manager,Pool

def modify_dictionary(dictionary):
    if((3,3) not in dictionary):
        dictionary[(3,3)]=0.
    for i in range(100):
        dictionary[(3,3)] = dictionary[(3,3)]+1
    return 0

if __name__ == "__main__":

    manager = Manager()

    dictionary = manager.dict(lock=True)
    jobargs = [(dictionary) for i in range(5)]

    p = Pool(5)
    t = p.map(modify_dictionary,jobargs)
    p.close()
    p.join()

    print dictionary[(3,3)]

I create a pool of 5 workers, and each worker should increment dictionary[(3,3)] 100 times. So, if the locking process works correctly, I expect dictionary[(3,3)] to be 500 at the end of the script.

However; something in my code must be wrong, because this is not what I get: the locking process does not seem to be "activated" and dictionary[(3,3)] always have a valuer <500 at the end of the script.

Could you help me?

Upvotes: 5

Views: 5078

Answers (3)

Jerry
Jerry

Reputation: 55

In the OP's code, it is locking the entire iteration. In general, you should only apply locks for the shortest time, as long as it is effective. The following code is much more efficient. You acquire the lock only to make the code atomic def f(dictionary, l, k): for i in range(100): with l: dictionary[3] += 1 Note that dictionary[3] += 1 is not atomic, so it must be locked.

Upvotes: 0

Stamatis Outsios
Stamatis Outsios

Reputation: 26

I ve managed many times to find here the correct solution to a programming difficulty I had. So I would like to contribute a little bit. Above code still has the problem of not updating right the dictionary. To have the right result you have to pass lock and correct jobargs to f. In above code you make a new dictionary in every proccess. The code I found to work fine:

from multiprocessing import Process, Manager, Pool, Lock
from functools import partial

def f(dictionary, l, k):
    with l:
        for i in range(100):
            dictionary[3] += 1

if __name__ == "__main__":
    manager = Manager()
    dictionary = manager.dict()
    lock = manager.Lock()

    dictionary[3] = 0
    jobargs = list(range(5))
    pool = Pool()
    func = partial(f, dictionary, lock)
    t = pool.map(func, jobargs)

    pool.close()
    pool.join()

    print(dictionary)

Upvotes: 0

Will Keeling
Will Keeling

Reputation: 22994

The problem is with this line:

dictionary[(3,3)] = dictionary[(3,3)]+1

Three things happen on that line:

  • Read the value of the dictionary key (3,3)
  • Increment the value by 1
  • Write the value back again

But the increment part is happening outside of any locking.

The whole sequence must be atomic, and must be synchronized across all processes. Otherwise the processes will interleave giving you a lower than expected total.

Holding a lock whist incrementing the value ensures that you get the total of 500 you expect:

from multiprocessing import Manager,Pool,Lock

lock = Lock()

def modify_array(dictionary):
    if((3,3) not in dictionary):
        dictionary[(3,3)]=0.
    for i in range(100):
        with lock:
            dictionary[(3,3)] = dictionary[(3,3)]+1
    return 0

if __name__ == "__main__":

    manager = Manager()

    dictionary = manager.dict(lock=True)
    jobargs = [(dictionary) for i in range(5)]

    p = Pool(5)
    t = p.map(modify_array,jobargs)
    p.close()
    p.join()

    print dictionary[(3,3)]

Upvotes: 3

Related Questions