leipsohul
leipsohul

Reputation: 1

Multiprocessing nested DictProxy object won't permit use of .items()

As implied, by this post, Manager().dict() permits only only one instance of Multiprocessing object to run at time. This is why I'm trying to convert the nested dictionary, v, from for k,v in sharedDict.items() to a dict object. But any time I try to call the Manager DictProxy object with .items or .get, the program crashes, in my case, the latter, .get, is 1 in 5 times/chance when I click 'Run' program.
The purpose of the Dictionary, is to keep track of what commands have been executed, what queue they're currently being executed on once they've started running (typically 1 of 4 multiprocessing queues). To keep things simple, for now, the value of the nested key is just a char, instead of a list, to diagnose the culprit of this problem.
Typical error is File "<string>", line 2, in keys and BrokenPipeError: [Errno 32] Broken pipe Error starts at the method, def traverseRootAndNestedKey(self, nestedKey):, which helps update the value of command status. I've pasted the full program.

import multiprocessing
import multiprocessing.managers
import os, sys
import pprint

class ProgressDictionary:
    def __init__(self):
        self.dictionary = {}
        # E.g.    k[0] = {'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
        #                                               'Nested key A_2': [1, 0],
        #                                               'Nested key A_3': [1, 0],
        #                                               'Nested key A_4': [1, 0]})}
        self.mprocDictionary = multiprocessing.Manager().dict()
        # Will need to be rebuilt each time self.dictionary is .updated()
        # E.g. mprocDictionary.update(<......>))
        #         = Manager().dict{'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
        #                                                                   ...
        #                                                        'Nested key A_4': [1, 0]}),
        #                          'Root Key B': Manager().dict({'Nested key B_1': [1, 0],
        #                                                                   ...
        #                                                        'Nested key B_4': [1, 0]})}
        self.container = multiprocessing.Manager().dict()   # Not used

    def buildMprocDict(self, rootKey, listOfNestedKeys):
        self.dictionary.update({rootKey: self.addNest(listOfNestedKeys)})
        # Debug
        # pprint.PrettyPrinter(width=128, sort_dicts=False).pprint(self.dictionary)
        # No braces as intended   -> 'Root Key A': <DictProxy object, typeid 'dict' at 0x72335a1908e0>
        self.mprocDictionary.update(self.dictionary)
        # Debug
        # print(type(self.mprocDictionary))
        # {'Root Key A': <DictProxy object, typeid 'dict' at 0x79b399d989a0>,
        #  'Root Key B': <DictProxy object, typeid 'dict' at 0x79b399d98eb0>}
        # type(self.mprocDictionary): <class 'multiprocessing.managers.DictProxy'>

        # Not used
        self.container.update({i : multiprocessing.Manager().dict()
                                    for i in self.mprocDictionary.keys()})
        # Debug
        # for k, v in self.mprocDictionary.items():
        #     print(f'{k} ---:--- {v}')
        # print(f'K43: {[i for i in self.mprocDictionary.keys()]}')
        # ['Root Key A', 'Root Key B']
        # print('L26: class function exit')
    def addNest(self, listOfNestedKeys):
        # Encapsulate nested dicts
        return multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys})
    def traverseNestedKeyOnly(self, rootKey, nestedKey):
        for k, v in dict(self.mprocDictionary.get(rootKey, {}).items()):
            if k == nestedKey:
                return k
    def traverseRootAndNestedKey(self, nestedKey):

        for k in self.mprocDictionary.keys():
            print(f'L56 {self.mprocDictionary.get(k).items()}')        # Works
            print(f'L57 {type(dict(self.mprocDictionary.get(k)))}')    #  Will occasionally generate error (1 in 5 chance)
                                                                    # multiprocessing.managers.DictProxy'> w/out(dict())
            print(f'L58 {(dict(self.mprocDictionary.get(k))).items()}') # File "<string>", line 2, in keys
            sys.exit()
            for k2, v in dict(self.mprocDictionary.get(k)).items():     # crashes, File "<string>", line 2, in keys
                print(f'L64 {k2} -:- {v}')
        sys.exit()
        # temp_dict = {k: dict(v) if isinstance(v, multiprocessing.managers.DictProxy) else v
        #              for k, v in self.mprocDictionary.items()}
        #pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(temp_dict)
        sys.exit()
        for k, v in self.mprocDictionary.items():  #File "<string>", line 2, in __getitem__
            for k2, v2 in v.items():    # We only know value, but not key
                # In pyCharm, v.items() gives warning: Unresolved attribute reference 'items' for class 'str'
                if k2 == nestedKey:
                    return [k, k2]
        #if nested key is not found due to being deleted earlier for whatever reason
        return None
    def setMemAddress(self, nestedKey, pid, debug_helper):
        # *args used, incase root key is never supplied.
        print(f'L64 argument show: {debug_helper}')   # No show
        k, k2 = self.traverseRootAndNestedKey(nestedKey)      # for k, v in self.mprocDictionary.items():
                                                              # File "<string>", line 2, in items
        # Sometimes will cut off here, and won't show the following statements
        # Converting dictProxy to dict:  for k, v in dict(self.mprocDictionary).items(),
        #                                         ...In traverseRootAndNestedKey() method .did not resolve this
        print(f'L65 key: {k}, val: {k2}')                  # display correct values
        #print(f'L68 {self.mprocDictionary[k]}')  # <DictProxy object, typeid 'dict' at 0x796feca0fd60; '__str__()' failed>
        #print(f'L69 {dict(self.mprocDictionary[k])}')   # File "<string>", line 2, in __getitem__
        if k is not None:
            # error here, nothing is shown
            temp = self.mprocDictionary[k][k2]
            print(f'L93 {temp}')                     # Will sometimes show None, other times 'A' w/Multiproc
            temp = pid
            # Alternative method
            self.mprocDictionary[k][k2].update(temp)
    def getMemAddress(self, rootKey, nestedKey):
        k = self.traverseNestedKeyOnly(rootKey, nestedKey)
        try:
            return self.mprocDictionary[rootKey][k]
        except IndexError:
            return None
    def printDictionary(self):
        pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(self.dictionary)
    def printMprocDictionary(self):
        for k, v in self.mprocDictionary.items():
            for k2, v2 in v.items():
                print(f'L78 {k2}  :  {v2}')

splitCMDqueue = multiprocessing.Queue()

qList = [multiprocessing.Queue() for i in range(2)]
listCMDs = []
listCMDs.append(['''system cmd a_1''',
                 '''system cmd a_2''',
                 '''system cmd a_3''',
                 '''system cmd a_4'''])
listCMDs.append(['''system cmd b_1''',
                 '''system cmd b_2''',
                 '''system cmd b_3''',
                 '''system cmd b_4'''])
listCMDcondition = ['''condition_system_cmd_1''',
                    '''condition_system_cmd_2''']

# {'Root Key A': {
#      'Nested key A_1': [1, 0],
#      'Nested key A_2': [1, 0],
#      'Nested key A_3': [1, 0],
#      'Nested key A_4': [1, 0]},
#  'Root Key B': {
#      'Nested key B_1': [1, 0],
#      'Nested key B_2': [1, 0],
#      'Nested key B_3': [1, 0],
#      'Nested key B_4': [1, 0]}}
def worker(sharedClass, nK):
    sharedClass.setMemAddress(nK, '0x12345FA', 'Call from Line 110')
    #print("L48") # Shows when previous statement is commented out
    print(f'L130, {sharedClass.getMemAddress(listCMDcondition[0], nK)}')

jobs = []
runtimeCommandProgress =  ProgressDictionary()

if __name__ == '__main__':

    for idx, i in enumerate(listCMDs):
        runtimeCommandProgress.buildMprocDict(listCMDcondition[idx], i)  # works/success
        for j in i:
            qList[idx].put(j)

    example_cmd = listCMDs[0][1]  # Nested key A_2, system cmd a_2
    # Both work
    # runtimeCommandProgress.setMemAddress(example_cmd, '0x12345FA', 'L136')
    # print(f'L133 {runtimeCommandProgress.getMemAddress(listCMDcondition[0], example_cmd)}')

    p = multiprocessing.Process(target=worker, args=(runtimeCommandProgress, example_cmd))

    jobs.append(p)
    for p in jobs:
        p.start()

Upvotes: 0

Views: 61

Answers (2)

vht981230
vht981230

Reputation: 4946

Your code didn't work when I ran locally, the issue seems to be that there are ProgressDictionary() created per process which should only be intended for main process. You can avoid this by adding runtimeCommandProgress = ProgressDictionary() inside if __name__ == '__main__' and also make sure to call .join() on all the subprocesses to avoid 'ForkAwareLocal' object has no attribute 'connection' error

import multiprocessing
import multiprocessing.managers
import os, sys
import pprint

class ProgressDictionary:
    def __init__(self):
        self.dictionary = {}
        # E.g.    k[0] = {'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
        #                                               'Nested key A_2': [1, 0],
        #                                               'Nested key A_3': [1, 0],
        #                                               'Nested key A_4': [1, 0]})}
        self.mprocDictionary = multiprocessing.Manager().dict()
        # Will need to be rebuilt each time self.dictionary is .updated()
        # E.g. mprocDictionary.update(<......>))
        #         = Manager().dict{'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
        #                                                                   ...
        #                                                        'Nested key A_4': [1, 0]}),
        #                          'Root Key B': Manager().dict({'Nested key B_1': [1, 0],
        #                                                                   ...
        #                                                        'Nested key B_4': [1, 0]})}
        self.container = multiprocessing.Manager().dict()   # Not used

    def buildMprocDict(self, rootKey, listOfNestedKeys):
        self.dictionary.update({rootKey: self.addNest(listOfNestedKeys)})
        # Debug
        # pprint.PrettyPrinter(width=128, sort_dicts=False).pprint(self.dictionary)
        # No braces as intended   -> 'Root Key A': <DictProxy object, typeid 'dict' at 0x72335a1908e0>
        self.mprocDictionary.update(self.dictionary)
        # Debug
        # print(type(self.mprocDictionary))
        # {'Root Key A': <DictProxy object, typeid 'dict' at 0x79b399d989a0>,
        #  'Root Key B': <DictProxy object, typeid 'dict' at 0x79b399d98eb0>}
        # type(self.mprocDictionary): <class 'multiprocessing.managers.DictProxy'>

        # Not used
        self.container.update({i : multiprocessing.Manager().dict()
                                    for i in self.mprocDictionary.keys()})
        # Debug
        for k, v in self.mprocDictionary.items():
            print(f'{k} ---:--- {v}')
        print(f'K43: {[i for i in self.mprocDictionary.keys()]}')
        #['Root Key A', 'Root Key B']
        print('L26: class function exit')
    def addNest(self, listOfNestedKeys):
        # Encapsulate nested dicts
        return multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys})
    def traverseNestedKeyOnly(self, rootKey, nestedKey):
        for k, v in dict(self.mprocDictionary.get(rootKey, {}).items()):
            if k == nestedKey:
                return k
    def traverseRootAndNestedKey(self, nestedKey):

        for k in self.mprocDictionary.keys():
            print(f'L56 {self.mprocDictionary.get(k).items()}')        # Works
            print(f'L57 {type(dict(self.mprocDictionary.get(k)))}')    #  Will occasionally generate error (1 in 5 chance)
                                                                    # multiprocessing.managers.DictProxy'> w/out(dict())
            print(f'L58 {(dict(self.mprocDictionary.get(k))).items()}') # File "<string>", line 2, in keys
            sys.exit()
            for k2, v in dict(self.mprocDictionary.get(k)).items():     # crashes, File "<string>", line 2, in keys
                print(f'L64 {k2} -:- {v}')
        sys.exit()
        # temp_dict = {k: dict(v) if isinstance(v, multiprocessing.managers.DictProxy) else v
        #              for k, v in self.mprocDictionary.items()}
        #pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(temp_dict)
        sys.exit()
        for k, v in self.mprocDictionary.items():  #File "<string>", line 2, in __getitem__
            for k2, v2 in v.items():    # We only know value, but not key
                # In pyCharm, v.items() gives warning: Unresolved attribute reference 'items' for class 'str'
                if k2 == nestedKey:
                    return [k, k2]
        #if nested key is not found due to being deleted earlier for whatever reason
        return None
    def setMemAddress(self, nestedKey, pid, debug_helper):
        # *args used, incase root key is never supplied.
        print(f'L64 argument show: {debug_helper}')   # No show
        k, k2 = self.traverseRootAndNestedKey(nestedKey)      # for k, v in self.mprocDictionary.items():
                                                              # File "<string>", line 2, in items
        # Sometimes will cut off here, and won't show the following statements
        # Converting dictProxy to dict:  for k, v in dict(self.mprocDictionary).items(),
        #                                         ...In traverseRootAndNestedKey() method .did not resolve this
        print(f'L65 key: {k}, val: {k2}')                  # display correct values
        #print(f'L68 {self.mprocDictionary[k]}')  # <DictProxy object, typeid 'dict' at 0x796feca0fd60; '__str__()' failed>
        #print(f'L69 {dict(self.mprocDictionary[k])}')   # File "<string>", line 2, in __getitem__
        if k is not None:
            # error here, nothing is shown
            temp = self.mprocDictionary[k][k2]
            print(f'L93 {temp}')                     # Will sometimes show None, other times 'A' w/Multiproc
            temp = pid
            # Alternative method
            self.mprocDictionary[k][k2].update(temp)
    def getMemAddress(self, rootKey, nestedKey):
        k = self.traverseNestedKeyOnly(rootKey, nestedKey)
        try:
            return self.mprocDictionary[rootKey][k]
        except IndexError:
            return None
    def printDictionary(self):
        pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(self.dictionary)
    def printMprocDictionary(self):
        for k, v in self.mprocDictionary.items():
            for k2, v2 in v.items():
                print(f'L78 {k2}  :  {v2}')

splitCMDqueue = multiprocessing.Queue()

qList = [multiprocessing.Queue() for i in range(2)]
listCMDs = []
listCMDs.append(['''system cmd a_1''',
                 '''system cmd a_2''',
                 '''system cmd a_3''',
                 '''system cmd a_4'''])
listCMDs.append(['''system cmd b_1''',
                 '''system cmd b_2''',
                 '''system cmd b_3''',
                 '''system cmd b_4'''])
listCMDcondition = ['''condition_system_cmd_1''',
                    '''condition_system_cmd_2''']

# {'Root Key A': {
#      'Nested key A_1': [1, 0],
#      'Nested key A_2': [1, 0],
#      'Nested key A_3': [1, 0],
#      'Nested key A_4': [1, 0]},
#  'Root Key B': {
#      'Nested key B_1': [1, 0],
#      'Nested key B_2': [1, 0],
#      'Nested key B_3': [1, 0],
#      'Nested key B_4': [1, 0]}}
def worker(sharedClass, nK):
    sharedClass.setMemAddress(nK, '0x12345FA', 'Call from Line 110')
    #print("L48") # Shows when previous statement is commented out
    print(f'L130, {sharedClass.getMemAddress(listCMDcondition[0], nK)}')

if __name__ == '__main__':
    jobs = []
    runtimeCommandProgress =  ProgressDictionary()

    for idx, i in enumerate(listCMDs):
        runtimeCommandProgress.buildMprocDict(listCMDcondition[idx], i)  # works/success
        for j in i:
            qList[idx].put(j)

    example_cmd = listCMDs[0][1]  # Nested key A_2, system cmd a_2
    # Both work
    # runtimeCommandProgress.setMemAddress(example_cmd, '0x12345FA', 'L136')
    # print(f'L133 {runtimeCommandProgress.getMemAddress(listCMDcondition[0], example_cmd)}')

    p = multiprocessing.Process(target=worker, args=(runtimeCommandProgress, example_cmd))

    jobs.append(p)
    for p in jobs:
        p.start()

    for p in jobs:
        p.join()

Upvotes: 0

Kenny Ostrom
Kenny Ostrom

Reputation: 5871

The answer you linked has the solution, although there are a few extra steps to get there. Your class should not call multiprocessing.Manager() but rather use the one instance created in your main.

if __name__ == '__main__':
    # here you need a manager instance
    with mp.Manager() as manager:
        # the rest of your code goes here, using the manager instance
        # for example
        runtimeCommandProgress =  ProgressDictionary(manager)

Here you use it in the class

class ProgressDictionary:
    def __init__(self, manager):
        self.dictionary = {}
        self.mprocDictionary = manager.dict()
        self.container = manager.dict()   # Not used

Upvotes: 0

Related Questions