Alexander Vedmed'
Alexander Vedmed'

Reputation: 184

Process won't terminate when i use Queue

When array (data) of more than 10,000, not all processes are finished (see last line print('compete')). When array up to 2,000 elements this code works fine. I think problem with queue, without result_queue.put([i,j]) all processes properly completed. Can anybody help me with this part of code?

    def finder(start,end,proc,result_queue,lock):
    global data
    i=start
    while i<=end:
        el=data[i]
        j=-1
        for el1 in data:
            j=j+1
            s1 = SequenceMatcher(None, el, el1)
            s1_val=s1.ratio()
            if s1_val>0.9: result_queue.put([i,j])
        i=i+1
    print('end')


if __name__ == '__main__':
    multiprocessing.freeze_support()
    result_queue = multiprocessing.Queue()
    allProcesses = []
    data=r.keys()
    print(len(data))
    parts=8
    part=int(len(data)/parts)
    i=0
    lock = multiprocessing.Lock()

    while i<parts:
            p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
            print('init',part*i, part*i+part,i)
            allProcesses.append(p)
            p.daemon = True
            p.start()
            i=i+1
            print('started process',i)
    i=0

    for p in allProcesses:
        p.join()
        print('complete')

Upvotes: 0

Views: 696

Answers (1)

Yoav Glazner
Yoav Glazner

Reputation: 8041

Short answer: Use multiprocessing.Manager to create the Queue

    m = multiprocessing.Manager()
    result_queue = m.Queue()

A bit more detailed answer: the multiprocessing.Manager will return an

<class 'multiprocessing.managers.AutoProxy[Queue]'>

instance which can be shared safely among workers.

Here is a complete runable example

import time
import multiprocessing


def finder(start,end,proc,result_queue,lock):
    #global data


    for i in range(start, end+1):
        #print (type(result_queue))

        result_queue.put((i,))




    print('end %s'%proc)

r = {i:i for i in range(100000)}


def main():

    multiprocessing.freeze_support()
    allProcesses = []
    data=r.keys()
    print(len(data))
    parts=8
    part=int(len(data)/parts)
    i=0
    lock = multiprocessing.Lock()
    m = multiprocessing.Manager()
    result_queue = m.Queue()
    while i<parts:

            p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
            print('init',part*i, part*i+part,i)

            p.daemon = False
            p.start()
            i=i+1
            print('started process',i)
            allProcesses.append(p)


    for p in allProcesses:
        print("join", p)

        print(p.join())
        print('complete')


if __name__ == '__main__':
    main()

If you change the m.Queue to multiprocessing.Queue you will see your old behavior

Upvotes: 3

Related Questions