some_programmer
some_programmer

Reputation: 3528

How to update keys in a dictionary using multiprocessing in Python?

This is a simplified version of the problem I am trying to solve.

I have a dictionary whose keys I am trying to update using multiprocessing with two functions as follows:

from multiprocessing import Process, Manager

d = {'a': [], 'b': []}


def func_1(d: dict):
    d['a'].append('func_1')
    d['b'].append(1)
    
def func_2(d: dict):
    d['a'].append('func_2')
    d['b'].append(2)

p1 = Process(target=func_1, args=(d,))
p2 = Process(target=func_2, args=(d,))
p1.start()
p2.start()
p1.join()
p2.join()
print(d)

When I do this, the following is printed out: {'a': [], 'b': []}. The values are not updated and after reading a bit about the issue, I came to know that this is because a copy of the variable is being shared in the process and not the variable itself.

How can I modify the above code to make sure that when both the functions are run, the dictionary d is updated and in the end d look as follows:

d = {'a': ['func_1', 'func_2'], 'b': [1, 2]}

The order in which the values are appended in the lists is not important.

Update 1:

I tried using Manager as follows:

from multiprocessing import Process, Manager

manager = Manager()

d = manager.dict({'a': [], 'b': []})


def func_1(d: dict):
    d['a'] = manager.dict({'a': ['func_1']})
    d['b'] = manager.dict({'b': [1]})


def func_2(d: dict):
    d['a'] = manager.dict({'a': ['func_2']})
    d['b'] = manager.dict({'b': [2]})


p1 = Process(target=func_1, args=(d,))
p2 = Process(target=func_2, args=(d,))
p1.start()
p2.start()
p1.join()
p2.join()
print(d)

And when I print the value of d, I get the following:

{'a': <DictProxy object, typeid 'dict' at 0x7f59b042d3d0>, 'b': <DictProxy object, typeid 'dict' at 0x7f59b0433550>}

When I do str(d.items()[0][1]) I see "{'a': ['func_2']}" & for str(d.items()[1][1]) I see "{'b': [2]}". Clearly, it is only saving the values from func_2. What is the mistake I am doing?

Upvotes: 0

Views: 536

Answers (1)

quamrana
quamrana

Reputation: 39354

I think you need the lists to be created by the manager as well since they are appended to asynchronously:

from multiprocessing import Process, Manager
import time


def func_1(d: dict):
    time.sleep(0.1)
    d['a'].append('func_1')
    d['b'].append(1)
    print('func1', d)


def func_2(d: dict):
    time.sleep(0.2)
    d['a'].append('func_2')
    d['b'].append(2)
    print('func2', d)


def main():
    manager = Manager()
    l1 = manager.list()
    l2 = manager.list()
    d = manager.dict({'a': l1, 'b': l2})
    p1 = Process(target=func_1, args=(d,))
    p2 = Process(target=func_2, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(d['a'], d['b'])


if __name__ == '__main__':
    main()

Output:

func1 {'a': <ListProxy object, typeid 'list' at 0x1d6f7fcdbd0>, 'b': <ListProxy object, typeid 'list' at 0x1d6f7fcdcc0>}
func2 {'a': <ListProxy object, typeid 'list' at 0x1d6f7fcdbd0>, 'b': <ListProxy object, typeid 'list' at 0x1d6f7fcdcc0>}
['func_1', 'func_2'] [1, 2]

The reason for using a manager to create lists is because both your functions want to call .append() on the same lists. This is potentially at the same time, but from different processes. The manager version of a list (or dict) is a proxy for a real list, but is available in different process spaces. Since you start your functions running through a Process this means that they are in different process spaces and do not share any memory. The manager proxies serialise all the operations and data and send it back to the main thread where the operations are actually performed on the real lists. In both your snippets of code you had built-in lists which were only located in the main thread. In the processes, they made their own lists which lived in their own processes with no connection to the main thread.

Upvotes: 1

Related Questions