Reputation: 624
I notice this behavior in python for the pool allocation. Even though I have 20 processes in the pool, when I do a map_async for say 8 processes, instead of throwing all the processes to execute, I get only 4 executing. when those 4 finish, it sends two more, and then when those two finish is sends one.
When I throw more than 20 at it, it runs all 20, until it starts to get less than 20 in the queue, when the above behavior repeats.
I assume this is done on purpose, but it looks weird. My goal is to have the requests processed as soon as they come in and obviously this behavior does not fit.
Using python 2.6 with billiard for maxtasksperchild support
Any ideas how can I improve it?
Code:
mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10)
while True:
lines = DbData.GetAll()
if len(lines) > 0:
print 'Starting to process: ', len(lines), ' urls'
Res = mypool.map_async(RunChild, lines)
Returns = Res.get(None)
print 'Pool returns: ', idx, Returns
else:
time.sleep(0.5)
Upvotes: 1
Views: 1290
Reputation: 63
One way I deal with multiprocessing in Python is the following:
I have data on which I want to use a function function()
.
First I create a multiprocessing subclass:
import multiprocessing
class ProcessThread(multiprocessing.Process):
def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
self.id_t = id_t
self.inputlist = inputqueue
self.idqueue = idqueue
self.function = function
self.resultqueue = resultqueue
multiprocessing.Process.__init__(self)
def run(self):
s = "process number: " + str(self.id_t) + " starting"
print s
result = []
while self.inputqueue.qsize() > 0
try:
inp = self.inputqueue.get()
except Exception:
pass
result = self.function(inp)
while 1:
try:
self.resultqueue.put([self.id,])
except Exception:
pass
else:
break
self.idqueue.put(id)
return
and the main function:
inputqueue = multiprocessing.Queue()
resultqueue = multiprocessing.Queue()
idqueue = multiprocessing.Queue()
def function(data):
print data # or what you want
for datum in data:
inputqueue.put(datum)
for i in xrange(nbprocess):
ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()
and finally get results:
results = []
while idqueue.qsize() < nbprocess:
pass
while resultqueue.qsize() > 0:
results.append(resultqueue.get())
In this way you can control perfectly what is appended with process and other stuff.
Using a multiprocessing inputqueue
is an efficient technique only if the computation for each datum is quite slow (< 1,2 seconds) because of the concurrent access of the different process to the queues (that why I use exception). If your function computes very quickly, consider splitting up your data only once at the begining and put chunks of the dataset for every process at the beginning.
Upvotes: 3