Reputation: 466
I'm running a multiprocessing script that is supposed to launch 2.000.000 jobs of about 0.01 seconds. Each job put the result in a queue imported from Queue because the queue from Multiprocessing module couldn't handle more than 517 results in it.
My programm freeze before getting the results from the queue. Here is the core of my multiprocess function:
while argslist != []:
p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
jobs.append(p)
p.start()
for p in jobs:
p.join()
print 'over'
res = [result_queue.get() for p in jobs]
print 'got it'
output: "over" but never "got it"
when I replace
result_queue.get()
by
result_queue.get_nowait()
I got the raise Empty error saying my queue is Empty...
but if I do the queue.get() just after the queue.put() in my inner function, then it works, showing me that the queue is well filed by my function..
Upvotes: 2
Views: 3812
Reputation: 466
Thank you mata, I switched back to the multiprocessing.Queue() but I don't want to use a pool because I want to keep track of how many jobs did run. I finally added an if statement to regularly empty my queue.
def multiprocess(function, argslist, ncpu):
total = len(argslist)
done = 0
result_queue = mp.Queue(0)
jobs = []
res = []
while argslist != []:
if len(mp.active_children()) < ncpu:
p = mp.Process(target=function, args=(result_queue, argslist.pop(),))
jobs.append(p)
p.start()
done += 1
print "\r",float(done)/total*100,"%", #here is to keep track
# here comes my emptying step
if len(jobs) == 500:
tmp = [result_queue.get() for p in jobs]
for r in tmp:
res.append(r)
result_queue = mp.Queue(0)
jobs = []
tmp = [result_queue.get() for p in jobs]
for r in tmp:
res.append(r)
return res
Then comes to my mind this question:
Is 500 jobs the limit because of python or because of my machine or my system?
Will this threshold be buggy if my multiprocessing function is used in other conditions?
Upvotes: 0
Reputation: 69012
queue.Queue
is not shared between processes, so it won't work with that, you must use multiprocessing.Queue
.
To avoid a deadlock you should not join your processes before getting the results from the queue. A multiprocessing.Queue
is effectively limited by its underlying pipes' buffer, so if that fills up no more items can be flushed to the pipe and queue.put()
will block until a consumer calls queue.get()
, but if the consumer is joining a blocked process, then you have a deadlock.
You can avoid all of this by using a multiprocessing.Pool
and its map()
instead.
Upvotes: 2