knite
knite

Reputation: 6171

Python multiprocessing - watchdog process?

I have a set of long-running process in a typical "pub/sub" setup with queues for communication.

I would like to do two things, and I can't figure out how to accomplish both simultaneously:

  1. Addition/removal of workers. For example, I want to be able to add extra consumers if I see that my pending queue size has grown too large.
  2. Watchdog for my processes - I want to be notified if any of my producers or consumers crashes.

I can do (2) in isolation:

try:
    while True:
        for process in workers + consumers:
            if not process.is_alive():
                logger.critical("%-8s%s died!", process.pid, process.name)
        sleep(3)
except KeyboardInterrupt:
    # Python propagates CTRL+C to all workers, no need to terminate them
    logger.warn('Received CTR+C, shutting down')

The above blocks, which prevents me from doing (1).

So I decided to move the code into its own process.

This doesn't work, because process.is_alive() only works for a parent checking the status of its children. In this case, the processes I want to check would be siblings instead of children.

I'm a bit stumped on how to proceed. How can my main process support changes to subprocesses while also monitoring subprocesses?

Upvotes: 2

Views: 4340

Answers (1)

dano
dano

Reputation: 94871

multiprocessing.Pool actually has a watchdog built-in already. It runs a thread that checks every 0.1 seconds to see if a worker has died. If it has, it starts a new one to take its place:

def _handle_workers(pool):
    thread = threading.current_thread()

    # Keep maintaining workers until the cache gets drained, unless the pool
    # is terminated.
    while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
        pool._maintain_pool()
        time.sleep(0.1)
    # send sentinel to stop workers
    pool._taskqueue.put(None)
    debug('worker handler exiting')

def _maintain_pool(self):
    """Clean up any exited workers and start replacements for them.
    """
    if self._join_exited_workers():
        self._repopulate_pool()

This is primarily used to implement the maxtasksperchild keyword argument, and is actually problematic in some cases. If a process dies while a map or apply command is running, and that process is in the middle of handling a task associated with that call, it will never finish. See this question for more information about that behavior.

That said, if you just want to know that a process has died, you can just create a thread (not a process) that monitors the pids of all the processes in the pool, and if the pids in the list ever change, you know a process has crashed:

def monitor_pids(pool):
    pids = [p.pid for p in pool._pool]
    while True:
      new_pids = [p.pid for p in pool._pool]
      if new_pids != pids:
          print("A worker died")
          pids = new_pids
      time.sleep(3)

Edit:

If you're rolling your own Pool implementation, you can just take a cue from multiprocessing.Pool, and run your monitoring code in a background thread in the parent process. The checks to see if the processes are still running are quick, so the time lost to the background thread taking the GIL should be negligible. Consider that the multiprocessing.Process watchdog is running every 0.1 seconds! Running yours every 3 seconds shouldn't cause any problems.

Upvotes: 1

Related Questions