Tarek Fadel
Tarek Fadel

Reputation: 1959

Python join a process without blocking parent

I'm writing a program that will watch a particular directory for new files containing download URLs. Once a new file is detected, it will create a new process to do the actual download while the parent continues to watch the directory. I'm using the Process interface from multiprocessing. The problem I have is that unless I call process.join() the child process is still running, but process.join() is a blocking function which defeats the purpose of creating the child to handle the actual download.

My question is, is there a way to join the child process in a non-blocking manner which will allow the parent to keep doing its thing?

Partial code:

def main(argv):
  # parse command line args
  ...
  # set up variables
  ...
  watch_dir(watch_dir, download_dir)


def watch_dir(wDir, dDir):
  # Grab the current watch directory listing
  before = dict([(f, None) for f in os.listdir (wDir)])

  # Loop FOREVER
  while 1:
    # sleep for 10 secs
    time.sleep(10)

    # Grab the current dir listing
    after = dict([(f, None) for f in os.listdir (wDir)])

    # Get the list of new files
    added = [f for f in after if not f in before]
    # Get the list of deleted files
    removed = [f for f in before if not f in after]

    if added:
      # We have new files, do your stuff
      print "Added: ", ", ".join(added)

      # Call the new process for downloading
      p = Process(target=child, args=(added, wDir, dDir))
      p.start()
      p.join()

    if removed:
      # tell the user the file was deleted
      print "Removed: ", ", ".join(removed)

    # Set before to the current
    before = after

def child(filename, wDir, dDir):
  # Open filename and extract the url
  ...
  # Download the file and to the dDir directory
  ...
  # Delete filename from the watch directory
  ...
  # exit cleanly
  os._exit(0)

The parent waits for the child to finish execution before continuing after p.join() which is (as far as I can tell) correct. But that defeats the whole purpose of creating the child. If I leave off p.join() then the child remains active and a ps ax | grep python give me 'python <defunct>'.

I'd like the child to finish up what its doing and go away without holding up the parent. Is there a way to do it?

Upvotes: 24

Views: 18343

Answers (5)

Tomasz Bartkowiak
Tomasz Bartkowiak

Reputation: 15058

You can also use multiprocessing.Process with deamon=True (daemonic process); the process.start() method does not block so your parent process can continue working without waiting for its child to finish.

The only caveat is that daemonic processes are not allowed to spawn children.

from multiprocessing import Process

child_process = Process(
    target=my_func,
    daemon=True
)
child_process.start()
# Keep doing your stuff

Upvotes: 1

skrrgwasme
skrrgwasme

Reputation: 9633

Instead of trying to shoehorn multiprocessing.Process() into working for you, perhaps you should use a different tool, like apply_async() with a multiprocessing.Pool():

def main(argv):
    # parse command line args
    ...
    # set up variables
    ...

    # set up multiprocessing Pool
    pool = multiprocessing.Pool()

    try:
        watch_dir(watch_dir, download_dir, pool)

    # catch whatever kind of exception you expect to end your infinite loop
    # you can omit this try/except if you really think your script will 
    # run "forever" and you're okay with zombies should it crash
    except KeyboardInterrupt:
        pool.close()
        pool.join()

def watch_dir(wDir, dDir, pool):
    # Grab the current watch directory listing
    before = dict([(f, None) for f in os.listdir (wDir)])

    # Loop FOREVER
    while 1:
        # sleep for 10 secs
        time.sleep(10)

        # Grab the current dir listing
        after = dict([(f, None) for f in os.listdir (wDir)])

        # Get the list of new files
        added = [f for f in after if not f in before]
        # Get the list of deleted files
        removed = [f for f in before if not f in after]

        if added:
            # We have new files, do your stuff
            print "Added: ", ", ".join(added)

            # launch the function in a subprocess - this is NON-BLOCKING
            pool.apply_async(child, (added, wDir, dDir))

        if removed:
            # tell the user the file was deleted
            print "Removed: ", ", ".join(removed)

        # Set before to the current
        before = after

def child(filename, wDir, dDir):
    # Open filename and extract the url
    ...
    # Download the file and to the dDir directory
    ...
    # Delete filename from the watch directory
    ...
    # simply return to "exit cleanly"
    return

The multiprocessing.Pool() is a pool of worker subprocesses that you can submit "jobs" to. The pool.apply_async() function call causes one of the subprocesses to run your function with the arguments provided, asynchronously, and doesn't need joined until your script is done with all of its work and closes the whole pool. The library manages the details for you.

I think this will serve you better than the current accepted answer for the following reasons:
1. It removes the unnecessary complexity of launching extra threads and queues just to manage subprocesses.
2. It uses library routines that are made specifically for this purpose, so you get the benefit of future library improvements.
3. IMHO, it is much more maintainable.
4. It is a more flexible. If you one day decide that you want to actually see a return value from your subprocesses, you can store the return value from the apply_async() call (a result object) and check it whenever you want. You could store a bunch of them in a list and process them as a batch when your list gets above a certain size. You can move the creation of the pool into the watch_dir() function and do away with the try/except if you don't really care what happens if the "infinite" loop is interrupted. If you put some kind of break condition in the (presently) infinite loop, you can simply add pool.close() and pool.join() after the loop and everything is cleaned up.

Upvotes: 3

Chris Powell
Chris Powell

Reputation: 195

In your while loop, call

multiprocessing.active_children()

Return list of all live children of the current process. Calling this has the side affect of “joining” any processes which have already finished.

Upvotes: 9

Fred Foo
Fred Foo

Reputation: 363828

You can set up a separate thread which does the joining. Have it listen on a queue into which you push the subprocess handles:

class Joiner(Thread):
    def __init__(self, q):
        self.__q = q
    def run(self):
        while True:
            child = self.__q.get()
            if child == None:
                return
            child.join()

Then, instead of p.join(), do joinq.put(p) and do a joinq.put(None) to signal the thread to stop. Make sure you use a FIFO queue.

Upvotes: 16

jchl
jchl

Reputation: 6542

If you don't care about when and whether the child terminates, and you just want to avoid the child ending up as a zombie process, then you can do a double-fork, so that the grandchild ends up being a child of init. In code:

def child(*args):
  p = Process(target=grandchild, args=args)
  p.start()
  os._exit(0)

def grandchild(filename, wDir, dDir):
  # Open filename and extract the url
  ...
  # Download the file and to the dDir directory
  ...
  # Delete filename from the watch directory
  ...
  # exit cleanly
  os._exit(0)

Upvotes: 2

Related Questions