Trolldejo
Trolldejo

Reputation: 466

Python multiprocess get result from queue

I'm running a multiprocessing script that is supposed to launch 2.000.000 jobs of about 0.01 seconds. Each job put the result in a queue imported from Queue because the queue from Multiprocessing module couldn't handle more than 517 results in it.

My programm freeze before getting the results from the queue. Here is the core of my multiprocess function:

while argslist != []:
    p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
    jobs.append(p)
    p.start()
for p in jobs:
    p.join()
print 'over'

res = [result_queue.get() for p in jobs]
print 'got it'

output: "over" but never "got it"

when I replace

result_queue.get() 

by

result_queue.get_nowait()

I got the raise Empty error saying my queue is Empty...

but if I do the queue.get() just after the queue.put() in my inner function, then it works, showing me that the queue is well filed by my function..

Upvotes: 2

Views: 3812

Answers (2)

Trolldejo
Trolldejo

Reputation: 466

Thank you mata, I switched back to the multiprocessing.Queue() but I don't want to use a pool because I want to keep track of how many jobs did run. I finally added an if statement to regularly empty my queue.

def multiprocess(function, argslist, ncpu):
    total = len(argslist)
    done = 0
    result_queue = mp.Queue(0)
    jobs = []
    res = []
    while argslist != []:
        if len(mp.active_children()) < ncpu:
            p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
            jobs.append(p)
            p.start()
            done += 1
            print "\r",float(done)/total*100,"%", #here is to keep track
        # here comes my emptying step
        if len(jobs) == 500:
            tmp = [result_queue.get() for p in jobs]
            for r in tmp:
                res.append(r)
            result_queue = mp.Queue(0)
            jobs = []

    tmp = [result_queue.get() for p in jobs]
    for r in tmp:
        res.append(r)
    return res

Then comes to my mind this question:
Is 500 jobs the limit because of python or because of my machine or my system?
Will this threshold be buggy if my multiprocessing function is used in other conditions?

Upvotes: 0

mata
mata

Reputation: 69012

queue.Queue is not shared between processes, so it won't work with that, you must use multiprocessing.Queue.

To avoid a deadlock you should not join your processes before getting the results from the queue. A multiprocessing.Queue is effectively limited by its underlying pipes' buffer, so if that fills up no more items can be flushed to the pipe and queue.put() will block until a consumer calls queue.get(), but if the consumer is joining a blocked process, then you have a deadlock.

You can avoid all of this by using a multiprocessing.Pool and its map() instead.

Upvotes: 2

Related Questions