SorinV
SorinV

Reputation: 624

multiprocessing - pool allocation

I notice this behavior in python for the pool allocation. Even though I have 20 processes in the pool, when I do a map_async for say 8 processes, instead of throwing all the processes to execute, I get only 4 executing. when those 4 finish, it sends two more, and then when those two finish is sends one.

When I throw more than 20 at it, it runs all 20, until it starts to get less than 20 in the queue, when the above behavior repeats.

I assume this is done on purpose, but it looks weird. My goal is to have the requests processed as soon as they come in and obviously this behavior does not fit.

Using python 2.6 with billiard for maxtasksperchild support

Any ideas how can I improve it?

Code:

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10)

while True:
    lines = DbData.GetAll()
    if len(lines) > 0:
        print 'Starting to process: ', len(lines), ' urls'
        Res = mypool.map_async(RunChild, lines)
        Returns = Res.get(None)
        print 'Pool returns: ', idx, Returns
    else:
        time.sleep(0.5)

Upvotes: 1

Views: 1290

Answers (1)

oliverXoX
oliverXoX

Reputation: 63

One way I deal with multiprocessing in Python is the following:

I have data on which I want to use a function function().
First I create a multiprocessing subclass:

import multiprocessing

class ProcessThread(multiprocessing.Process):
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
        self.id_t = id_t
        self.inputlist = inputqueue
        self.idqueue = idqueue
        self.function = function
        self.resultqueue = resultqueue

        multiprocessing.Process.__init__(self)

    def run(self):
        s = "process number: " + str(self.id_t) + " starting"
        print s
        result = []

        while self.inputqueue.qsize() > 0
            try:
                inp = self.inputqueue.get()
            except Exception:
                pass
            result = self.function(inp)
            while 1:
               try:
                   self.resultqueue.put([self.id,])
               except Exception:
                   pass
               else:
                   break
            self.idqueue.put(id)
            return

and the main function:

inputqueue = multiprocessing.Queue()
resultqueue = multiprocessing.Queue()
idqueue = multiprocessing.Queue()

def function(data):
    print data # or what you want

for datum in data:
    inputqueue.put(datum)

for i in xrange(nbprocess):
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()

and finally get results:

results = []
while idqueue.qsize() < nbprocess:
    pass
while resultqueue.qsize() > 0:
    results.append(resultqueue.get())

In this way you can control perfectly what is appended with process and other stuff. Using a multiprocessing inputqueue is an efficient technique only if the computation for each datum is quite slow (< 1,2 seconds) because of the concurrent access of the different process to the queues (that why I use exception). If your function computes very quickly, consider splitting up your data only once at the begining and put chunks of the dataset for every process at the beginning.

Upvotes: 3

Related Questions