gosom
gosom

Reputation: 1319

Python multiproccessing memory increase

I have a program that should run forever. Here is what I am doing:

from myfuncs import do, process

class Worker(multiprocessing.Process):

    def __init__(self, lock):
        multiprocesing.Process.__init__(self)
        self.lock = lock
        self.queue = Redis(..) # this is a redis based queue
        self.res_queue = Redis(...)

     def run():
         while True:
             job = self.queue.get(block=True)
             job.results = process(job)
             with self.lock:
                 post_process(self.res_queue, job)


def main():
    lock = multiprocessing.Semaphore(1)
    ps = [Worker(lock) for _ in xrange(4)]
    [p.start() for p in ps]
    [p.join() for p in ps]

self.queue and self.res_queue are two objects that work similar like python stdlib Queue but they use Redis database as a backend.

Function process does some processing to the data that job carries( mostly html parsing ) and returns a dictionary.

Function post_process writes the job to another redis queue by checking some criteria (only one process at a time can check for the criteria that's why the lock). It returns True/False.

The memory used by the program everyday is increasing. Can somebody figure out what is going on?

Memory should be free when job get outs of scope in the run method correct?

Upvotes: 3

Views: 1855

Answers (2)

dano
dano

Reputation: 94981

If you can't find the source of the leak, you can work around it by having each of your workers only process a limited number of tasks. Once they've hit the task limit, you can then have them exit, and replace them with a new worker process. The built-in multiprocessing.Pool object supports this via the maxtasksperchild keyword argument. You could do something similar:

import multiprocessing
import threading

class WorkerPool(object):
    def __init__(self, workers=multiprocessing.cpu_count(),
                 maxtasksperchild=None, lock=multiprocessing.Semaphore(1)):
        self._lock = multiprocessing.Semaphore(1)
        self._max_tasks = maxtasksperchild
        self._workers = workers
        self._pool = []
        self._repopulate_pool()
        self._pool_monitor = threading.Thread(self._monitor_pool)
        self._pool_monitor.daemon = True
        self._pool_monitor.start()

    def _monitor_pool(self):
        """ This runs in its own thread and monitors the pool. """
        while True:
            self._maintain_pool()
            time.sleep(0.1)

    def _maintain_pool(self):
        """ If any workers have exited, start a new one in its place. """
        if self._join_exited_workers():
            self._repopulate_pool()

    def _join_exited_workers(self):
        """ Find exited workers and join them. """
        cleaned = False
        for i in reversed(range(len(self._pool))):
            worker = self._pool[i]
            if worker.exitcode is not None:
                # worker exited
                worker.join()
                cleaned = True
                del self._pool[i]
        return cleaned

    def _repopulate_pool(self):
        """ Start new workers if any have exited. """
        for i in range(self._workers - len(self._pool)):
            w = Worker(self._lock, self._max_tasks)
            self._pool.append(w)
            w.start()    


class Worker(multiprocessing.Process):

    def __init__(self, lock, max_tasks):
        multiprocesing.Process.__init__(self)
        self.lock = lock
        self.queue = Redis(..) # this is a redis based queue
        self.res_queue = Redis(...)
        self.max_tasks = max_tasks

     def run():
         runs = 0
         while self.max_tasks and runs < self.max_tasks:
             job = self.queue.get(block=True)
             job.results = process(job)
             with self.lock:
                 post_process(self.res_queue, job)
            if self.max_tasks:
                 runs += 1


def main():
    pool = WorkerPool(workers=4, maxtasksperchild=1000)
    # The program will block here since none of the workers are daemons.
    # It's not clear how/when you want to shut things down, but the Pool
    # can be enhanced to support that pretty easily.

Note that the pool monitoring code above is almost exactly the same as the code that's used in multiprocessing.Pool for the same purpose.

Upvotes: 4

abarnert
abarnert

Reputation: 366213

Memory should be free when job get outs of scope in the run method correct?

First, the scope is the entire run method, which loops forever, so that never happens. (Besides, when you exit the run method, the process shuts down and its memory is freed anyway…)

But even if it did go out of scope, that wouldn't mean what you seem to think it means. Python isn't like C++, where there are variables whose storage is on the stack. All objects live on the heap, and they stay alive until there are no more references to them. A variable falling out of scope means that variable is no longer referring to whatever object it used to be referring to. If that variable was the only reference to the object, then it will be freed*, but if there are other references that you've made elsewhere, the object can't be freed until those other references go away.

Meanwhile, there's nothing magical about going out of scope. Any way a variable stops referring to an object has the same effect—whether it's the variable going out of scope, you calling del on it, or you assigning a new value to it. So, each time through the loop, when you do job =, you're dropping the previous reference to job even though nothing ever went out of scope. (But keep in mind that you will have two jobs alive at peak, not one, because the new one is pulled off the queue before the old one is released. If that's an issue, you can always do job = None before blocking on the queue.)

So, assuming the problem actually is the job object (or something it owns), the problem is that some of the code you haven't shown us is keeping a reference to it around somewhere.

Without knowing what you're doing, it's hard to suggest a fix. It may just be "don't store that there". Or it may be "store a weakref instead of the object itself". Or "add an LRU algorithm". Or "add some flow control so if you get too backed up you don't keep piling on work until you run out of memory".


* In CPython, this happens immediately, because the garbage collector is based on refcounting. In Jython and IronPython, on the other hand, the garbage collector just relies on the underlying VM's garbage collector, so the object isn't freed until the JVM or CLR notices that it's no longer being referenced, which is generally not immediate, and nondeterministic.

Upvotes: 5

Related Questions