Reputation: 184
When array (data) of more than 10,000, not all processes are finished (see last line print('compete')). When array up to 2,000 elements this code works fine. I think problem with queue, without result_queue.put([i,j]) all processes properly completed. Can anybody help me with this part of code?
def finder(start,end,proc,result_queue,lock):
global data
i=start
while i<=end:
el=data[i]
j=-1
for el1 in data:
j=j+1
s1 = SequenceMatcher(None, el, el1)
s1_val=s1.ratio()
if s1_val>0.9: result_queue.put([i,j])
i=i+1
print('end')
if __name__ == '__main__':
multiprocessing.freeze_support()
result_queue = multiprocessing.Queue()
allProcesses = []
data=r.keys()
print(len(data))
parts=8
part=int(len(data)/parts)
i=0
lock = multiprocessing.Lock()
while i<parts:
p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
print('init',part*i, part*i+part,i)
allProcesses.append(p)
p.daemon = True
p.start()
i=i+1
print('started process',i)
i=0
for p in allProcesses:
p.join()
print('complete')
Upvotes: 0
Views: 696
Reputation: 8041
Short answer: Use multiprocessing.Manager to create the Queue
m = multiprocessing.Manager()
result_queue = m.Queue()
A bit more detailed answer: the multiprocessing.Manager will return an
<class 'multiprocessing.managers.AutoProxy[Queue]'>
instance which can be shared safely among workers.
Here is a complete runable example
import time
import multiprocessing
def finder(start,end,proc,result_queue,lock):
#global data
for i in range(start, end+1):
#print (type(result_queue))
result_queue.put((i,))
print('end %s'%proc)
r = {i:i for i in range(100000)}
def main():
multiprocessing.freeze_support()
allProcesses = []
data=r.keys()
print(len(data))
parts=8
part=int(len(data)/parts)
i=0
lock = multiprocessing.Lock()
m = multiprocessing.Manager()
result_queue = m.Queue()
while i<parts:
p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
print('init',part*i, part*i+part,i)
p.daemon = False
p.start()
i=i+1
print('started process',i)
allProcesses.append(p)
for p in allProcesses:
print("join", p)
print(p.join())
print('complete')
if __name__ == '__main__':
main()
If you change the m.Queue to multiprocessing.Queue you will see your old behavior
Upvotes: 3