Reputation: 1
As implied, by this post, Manager().dict() permits only only one instance of Multiprocessing object to run at time. This is why I'm trying to convert the nested dictionary, v, from for k,v in sharedDict.items()
to a dict object. But any time I try to call the Manager DictProxy object with .items or .get, the program crashes, in my case, the latter, .get, is 1 in 5 times/chance when I click 'Run' program.
The purpose of the Dictionary, is to keep track of what commands have been executed, what queue they're currently being executed on once they've started running (typically 1 of 4 multiprocessing queues). To keep things simple, for now, the value of the nested key is just a char, instead of a list, to diagnose the culprit of this problem.
Typical error is File "<string>", line 2, in keys
and BrokenPipeError: [Errno 32] Broken pipe
Error starts at the method, def traverseRootAndNestedKey(self, nestedKey):
, which helps update the value of command status. I've pasted the full program.
import multiprocessing
import multiprocessing.managers
import os, sys
import pprint
class ProgressDictionary:
def __init__(self):
self.dictionary = {}
# E.g. k[0] = {'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
# 'Nested key A_2': [1, 0],
# 'Nested key A_3': [1, 0],
# 'Nested key A_4': [1, 0]})}
self.mprocDictionary = multiprocessing.Manager().dict()
# Will need to be rebuilt each time self.dictionary is .updated()
# E.g. mprocDictionary.update(<......>))
# = Manager().dict{'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
# ...
# 'Nested key A_4': [1, 0]}),
# 'Root Key B': Manager().dict({'Nested key B_1': [1, 0],
# ...
# 'Nested key B_4': [1, 0]})}
self.container = multiprocessing.Manager().dict() # Not used
def buildMprocDict(self, rootKey, listOfNestedKeys):
self.dictionary.update({rootKey: self.addNest(listOfNestedKeys)})
# Debug
# pprint.PrettyPrinter(width=128, sort_dicts=False).pprint(self.dictionary)
# No braces as intended -> 'Root Key A': <DictProxy object, typeid 'dict' at 0x72335a1908e0>
self.mprocDictionary.update(self.dictionary)
# Debug
# print(type(self.mprocDictionary))
# {'Root Key A': <DictProxy object, typeid 'dict' at 0x79b399d989a0>,
# 'Root Key B': <DictProxy object, typeid 'dict' at 0x79b399d98eb0>}
# type(self.mprocDictionary): <class 'multiprocessing.managers.DictProxy'>
# Not used
self.container.update({i : multiprocessing.Manager().dict()
for i in self.mprocDictionary.keys()})
# Debug
# for k, v in self.mprocDictionary.items():
# print(f'{k} ---:--- {v}')
# print(f'K43: {[i for i in self.mprocDictionary.keys()]}')
# ['Root Key A', 'Root Key B']
# print('L26: class function exit')
def addNest(self, listOfNestedKeys):
# Encapsulate nested dicts
return multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys})
def traverseNestedKeyOnly(self, rootKey, nestedKey):
for k, v in dict(self.mprocDictionary.get(rootKey, {}).items()):
if k == nestedKey:
return k
def traverseRootAndNestedKey(self, nestedKey):
for k in self.mprocDictionary.keys():
print(f'L56 {self.mprocDictionary.get(k).items()}') # Works
print(f'L57 {type(dict(self.mprocDictionary.get(k)))}') # Will occasionally generate error (1 in 5 chance)
# multiprocessing.managers.DictProxy'> w/out(dict())
print(f'L58 {(dict(self.mprocDictionary.get(k))).items()}') # File "<string>", line 2, in keys
sys.exit()
for k2, v in dict(self.mprocDictionary.get(k)).items(): # crashes, File "<string>", line 2, in keys
print(f'L64 {k2} -:- {v}')
sys.exit()
# temp_dict = {k: dict(v) if isinstance(v, multiprocessing.managers.DictProxy) else v
# for k, v in self.mprocDictionary.items()}
#pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(temp_dict)
sys.exit()
for k, v in self.mprocDictionary.items(): #File "<string>", line 2, in __getitem__
for k2, v2 in v.items(): # We only know value, but not key
# In pyCharm, v.items() gives warning: Unresolved attribute reference 'items' for class 'str'
if k2 == nestedKey:
return [k, k2]
#if nested key is not found due to being deleted earlier for whatever reason
return None
def setMemAddress(self, nestedKey, pid, debug_helper):
# *args used, incase root key is never supplied.
print(f'L64 argument show: {debug_helper}') # No show
k, k2 = self.traverseRootAndNestedKey(nestedKey) # for k, v in self.mprocDictionary.items():
# File "<string>", line 2, in items
# Sometimes will cut off here, and won't show the following statements
# Converting dictProxy to dict: for k, v in dict(self.mprocDictionary).items(),
# ...In traverseRootAndNestedKey() method .did not resolve this
print(f'L65 key: {k}, val: {k2}') # display correct values
#print(f'L68 {self.mprocDictionary[k]}') # <DictProxy object, typeid 'dict' at 0x796feca0fd60; '__str__()' failed>
#print(f'L69 {dict(self.mprocDictionary[k])}') # File "<string>", line 2, in __getitem__
if k is not None:
# error here, nothing is shown
temp = self.mprocDictionary[k][k2]
print(f'L93 {temp}') # Will sometimes show None, other times 'A' w/Multiproc
temp = pid
# Alternative method
self.mprocDictionary[k][k2].update(temp)
def getMemAddress(self, rootKey, nestedKey):
k = self.traverseNestedKeyOnly(rootKey, nestedKey)
try:
return self.mprocDictionary[rootKey][k]
except IndexError:
return None
def printDictionary(self):
pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(self.dictionary)
def printMprocDictionary(self):
for k, v in self.mprocDictionary.items():
for k2, v2 in v.items():
print(f'L78 {k2} : {v2}')
splitCMDqueue = multiprocessing.Queue()
qList = [multiprocessing.Queue() for i in range(2)]
listCMDs = []
listCMDs.append(['''system cmd a_1''',
'''system cmd a_2''',
'''system cmd a_3''',
'''system cmd a_4'''])
listCMDs.append(['''system cmd b_1''',
'''system cmd b_2''',
'''system cmd b_3''',
'''system cmd b_4'''])
listCMDcondition = ['''condition_system_cmd_1''',
'''condition_system_cmd_2''']
# {'Root Key A': {
# 'Nested key A_1': [1, 0],
# 'Nested key A_2': [1, 0],
# 'Nested key A_3': [1, 0],
# 'Nested key A_4': [1, 0]},
# 'Root Key B': {
# 'Nested key B_1': [1, 0],
# 'Nested key B_2': [1, 0],
# 'Nested key B_3': [1, 0],
# 'Nested key B_4': [1, 0]}}
def worker(sharedClass, nK):
sharedClass.setMemAddress(nK, '0x12345FA', 'Call from Line 110')
#print("L48") # Shows when previous statement is commented out
print(f'L130, {sharedClass.getMemAddress(listCMDcondition[0], nK)}')
jobs = []
runtimeCommandProgress = ProgressDictionary()
if __name__ == '__main__':
for idx, i in enumerate(listCMDs):
runtimeCommandProgress.buildMprocDict(listCMDcondition[idx], i) # works/success
for j in i:
qList[idx].put(j)
example_cmd = listCMDs[0][1] # Nested key A_2, system cmd a_2
# Both work
# runtimeCommandProgress.setMemAddress(example_cmd, '0x12345FA', 'L136')
# print(f'L133 {runtimeCommandProgress.getMemAddress(listCMDcondition[0], example_cmd)}')
p = multiprocessing.Process(target=worker, args=(runtimeCommandProgress, example_cmd))
jobs.append(p)
for p in jobs:
p.start()
Upvotes: 0
Views: 61
Reputation: 4946
Your code didn't work when I ran locally, the issue seems to be that there are ProgressDictionary()
created per process which should only be intended for main process. You can avoid this by adding runtimeCommandProgress = ProgressDictionary()
inside if __name__ == '__main__'
and also make sure to call .join()
on all the subprocesses to avoid 'ForkAwareLocal' object has no attribute 'connection'
error
import multiprocessing
import multiprocessing.managers
import os, sys
import pprint
class ProgressDictionary:
def __init__(self):
self.dictionary = {}
# E.g. k[0] = {'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
# 'Nested key A_2': [1, 0],
# 'Nested key A_3': [1, 0],
# 'Nested key A_4': [1, 0]})}
self.mprocDictionary = multiprocessing.Manager().dict()
# Will need to be rebuilt each time self.dictionary is .updated()
# E.g. mprocDictionary.update(<......>))
# = Manager().dict{'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
# ...
# 'Nested key A_4': [1, 0]}),
# 'Root Key B': Manager().dict({'Nested key B_1': [1, 0],
# ...
# 'Nested key B_4': [1, 0]})}
self.container = multiprocessing.Manager().dict() # Not used
def buildMprocDict(self, rootKey, listOfNestedKeys):
self.dictionary.update({rootKey: self.addNest(listOfNestedKeys)})
# Debug
# pprint.PrettyPrinter(width=128, sort_dicts=False).pprint(self.dictionary)
# No braces as intended -> 'Root Key A': <DictProxy object, typeid 'dict' at 0x72335a1908e0>
self.mprocDictionary.update(self.dictionary)
# Debug
# print(type(self.mprocDictionary))
# {'Root Key A': <DictProxy object, typeid 'dict' at 0x79b399d989a0>,
# 'Root Key B': <DictProxy object, typeid 'dict' at 0x79b399d98eb0>}
# type(self.mprocDictionary): <class 'multiprocessing.managers.DictProxy'>
# Not used
self.container.update({i : multiprocessing.Manager().dict()
for i in self.mprocDictionary.keys()})
# Debug
for k, v in self.mprocDictionary.items():
print(f'{k} ---:--- {v}')
print(f'K43: {[i for i in self.mprocDictionary.keys()]}')
#['Root Key A', 'Root Key B']
print('L26: class function exit')
def addNest(self, listOfNestedKeys):
# Encapsulate nested dicts
return multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys})
def traverseNestedKeyOnly(self, rootKey, nestedKey):
for k, v in dict(self.mprocDictionary.get(rootKey, {}).items()):
if k == nestedKey:
return k
def traverseRootAndNestedKey(self, nestedKey):
for k in self.mprocDictionary.keys():
print(f'L56 {self.mprocDictionary.get(k).items()}') # Works
print(f'L57 {type(dict(self.mprocDictionary.get(k)))}') # Will occasionally generate error (1 in 5 chance)
# multiprocessing.managers.DictProxy'> w/out(dict())
print(f'L58 {(dict(self.mprocDictionary.get(k))).items()}') # File "<string>", line 2, in keys
sys.exit()
for k2, v in dict(self.mprocDictionary.get(k)).items(): # crashes, File "<string>", line 2, in keys
print(f'L64 {k2} -:- {v}')
sys.exit()
# temp_dict = {k: dict(v) if isinstance(v, multiprocessing.managers.DictProxy) else v
# for k, v in self.mprocDictionary.items()}
#pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(temp_dict)
sys.exit()
for k, v in self.mprocDictionary.items(): #File "<string>", line 2, in __getitem__
for k2, v2 in v.items(): # We only know value, but not key
# In pyCharm, v.items() gives warning: Unresolved attribute reference 'items' for class 'str'
if k2 == nestedKey:
return [k, k2]
#if nested key is not found due to being deleted earlier for whatever reason
return None
def setMemAddress(self, nestedKey, pid, debug_helper):
# *args used, incase root key is never supplied.
print(f'L64 argument show: {debug_helper}') # No show
k, k2 = self.traverseRootAndNestedKey(nestedKey) # for k, v in self.mprocDictionary.items():
# File "<string>", line 2, in items
# Sometimes will cut off here, and won't show the following statements
# Converting dictProxy to dict: for k, v in dict(self.mprocDictionary).items(),
# ...In traverseRootAndNestedKey() method .did not resolve this
print(f'L65 key: {k}, val: {k2}') # display correct values
#print(f'L68 {self.mprocDictionary[k]}') # <DictProxy object, typeid 'dict' at 0x796feca0fd60; '__str__()' failed>
#print(f'L69 {dict(self.mprocDictionary[k])}') # File "<string>", line 2, in __getitem__
if k is not None:
# error here, nothing is shown
temp = self.mprocDictionary[k][k2]
print(f'L93 {temp}') # Will sometimes show None, other times 'A' w/Multiproc
temp = pid
# Alternative method
self.mprocDictionary[k][k2].update(temp)
def getMemAddress(self, rootKey, nestedKey):
k = self.traverseNestedKeyOnly(rootKey, nestedKey)
try:
return self.mprocDictionary[rootKey][k]
except IndexError:
return None
def printDictionary(self):
pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(self.dictionary)
def printMprocDictionary(self):
for k, v in self.mprocDictionary.items():
for k2, v2 in v.items():
print(f'L78 {k2} : {v2}')
splitCMDqueue = multiprocessing.Queue()
qList = [multiprocessing.Queue() for i in range(2)]
listCMDs = []
listCMDs.append(['''system cmd a_1''',
'''system cmd a_2''',
'''system cmd a_3''',
'''system cmd a_4'''])
listCMDs.append(['''system cmd b_1''',
'''system cmd b_2''',
'''system cmd b_3''',
'''system cmd b_4'''])
listCMDcondition = ['''condition_system_cmd_1''',
'''condition_system_cmd_2''']
# {'Root Key A': {
# 'Nested key A_1': [1, 0],
# 'Nested key A_2': [1, 0],
# 'Nested key A_3': [1, 0],
# 'Nested key A_4': [1, 0]},
# 'Root Key B': {
# 'Nested key B_1': [1, 0],
# 'Nested key B_2': [1, 0],
# 'Nested key B_3': [1, 0],
# 'Nested key B_4': [1, 0]}}
def worker(sharedClass, nK):
sharedClass.setMemAddress(nK, '0x12345FA', 'Call from Line 110')
#print("L48") # Shows when previous statement is commented out
print(f'L130, {sharedClass.getMemAddress(listCMDcondition[0], nK)}')
if __name__ == '__main__':
jobs = []
runtimeCommandProgress = ProgressDictionary()
for idx, i in enumerate(listCMDs):
runtimeCommandProgress.buildMprocDict(listCMDcondition[idx], i) # works/success
for j in i:
qList[idx].put(j)
example_cmd = listCMDs[0][1] # Nested key A_2, system cmd a_2
# Both work
# runtimeCommandProgress.setMemAddress(example_cmd, '0x12345FA', 'L136')
# print(f'L133 {runtimeCommandProgress.getMemAddress(listCMDcondition[0], example_cmd)}')
p = multiprocessing.Process(target=worker, args=(runtimeCommandProgress, example_cmd))
jobs.append(p)
for p in jobs:
p.start()
for p in jobs:
p.join()
Upvotes: 0
Reputation: 5871
The answer you linked has the solution, although there are a few extra steps to get there. Your class should not call multiprocessing.Manager() but rather use the one instance created in your main.
if __name__ == '__main__':
# here you need a manager instance
with mp.Manager() as manager:
# the rest of your code goes here, using the manager instance
# for example
runtimeCommandProgress = ProgressDictionary(manager)
Here you use it in the class
class ProgressDictionary:
def __init__(self, manager):
self.dictionary = {}
self.mprocDictionary = manager.dict()
self.container = manager.dict() # Not used
Upvotes: 0