ScottWilson
ScottWilson

Reputation: 370

Python multiprocessing in Nuke causing Nuke to hang

I have the following code that causes Nuke to hang. Basically, what I'm trying to do is get a list of files and folders from the file system, and I am trying to speed it up through parallel processing. This works perfectly outside of Nuke, but as I said before, running this in Nuke will cause Nuke to hang. Is there a better way to do this that will cause Nuke to not hang? Preferably, I'd like to fix this through Python's standard library, or packages that are platform agnostic. But, if there's no way to do that, then I'm fine with that. Worst case, I will have to go back to not using parallel processing and find other optimizations.

Also, when I run this code in Nuke, I get the following error in the console:

Unknown units in -c from multiprocessing.forking import main; main()

The code:

#!/bin/env python

import multiprocessing
import os

CPU_COUNT = multiprocessing.cpu_count()


def _threaded_master(root):
    in_queue = multiprocessing.JoinableQueue()
    folder_queue = multiprocessing.JoinableQueue()
    file_queue = multiprocessing.JoinableQueue()

    in_queue.put(root)

    for _ in xrange(CPU_COUNT):
        multiprocessing.Process(target=_threaded_slave, args=(in_queue, folder_queue, file_queue)).start()

    in_queue.join()

    return {"folders": folder_queue, "files": file_queue}


def _threaded_slave(in_queue, folder_queue, file_queue):
    while True:
        path_item = in_queue.get()

        if os.path.isdir(path_item):
            for item in os.listdir(path_item):
                path = os.path.join(path_item, item)
                in_queue.put(path)

        in_queue.task_done()


if __name__ == "__main__":
    print _threaded_master(r"/path/to/root")

Upvotes: 2

Views: 1641

Answers (2)

Dushyant
Dushyant

Reputation: 11

Here's a link to refer to: https://learn.foundry.com/nuke/developers/63/pythondevguide/threading.html

What's notable is the warning mentioned in there: nuke.executeInMainThread and nuke.executeInMainThreadWithResult should always be run from a child thread. If run from within the main thread, they freeze NUKE.

So, spawn a new child thread, and do your stuff there.

Upvotes: 1

johntellsall
johntellsall

Reputation: 15170

Here's my code to scan through a large tree of directories using several threads.

I'd originally written the code to use good old multiprocessing.Pool(), because it's very easy and gives you the results of the functions. Input and output queues are not needed. Another difference is it uses processes over threads, which have some tradeoffs.

The Pool has a big drawback: it assumes you have a static list of items to process.

So, I rewrote the code following your original example: input/output queue of directories to process, and an output queue. The caller has to explicitly grab items from the output queue.

For grins I ran a timing comparison with good old os.walk() and... at least on my machine the traditional solution was faster. The two solutions produced quite different numbers of files, which I can't explain.

Have fun!

source

#!/bin/env python

import multiprocessing, threading, time
import logging, os, Queue, sys

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)-4s %(levelname)s %(threadName)s %(message)s", 
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)

def scan_dir(topdir):
    try:
        for name in os.listdir(topdir):
            path = os.path.join(topdir, name)
            yield (path, os.path.isdir(path))
    except OSError:
        logging.error('uhoh: %s', topdir)

def scan_dir_queue(inqueue, outqueue):
    logging.info('start')
    while True:
        try:
            dir_item = inqueue.get_nowait()
        except Queue.Empty:
            break

        res = list( scan_dir(dir_item) )
        logging.debug('- %d paths', len(res))
        for path,isdir in res:
            outqueue.put( (path,isdir) )
            if isdir:
                inqueue.put(path)
    logging.info('done')

def thread_master(root):
    dir_queue = Queue.Queue() # pylint: disable=E1101
    dir_queue.put(root)
    result_queue = Queue.Queue()

    threads = [
        threading.Thread(
            target=scan_dir_queue, args=[dir_queue, result_queue]
        )
        for _ in range(multiprocessing.cpu_count())
    ]

    for th in threads:
        th.start()
    for th in threads:
        th.join()
    return result_queue.queue

if __name__ == "__main__":
    topdir = os.path.expanduser('~')

    start = time.time()
    res = thread_master(topdir)
    print 'threaded:', time.time() - start
    print len(res), 'paths'

    def mywalk(topdir):
        for (dirpath, _dirnames, filenames) in os.walk(topdir):
            for name in filenames:
                yield os.path.join(dirpath, name)
    start = time.time()
    res = list(mywalk(topdir))
    print 'os.walk:', time.time() - start
    print len(res), 'paths'

output

11:56:35 INFO Thread-1 start
11:56:35 INFO Thread-2 start
11:56:35 INFO Thread-3 start
11:56:35 INFO Thread-4 start
11:56:35 INFO Thread-2 done
11:56:35 INFO Thread-3 done
11:56:35 INFO Thread-4 done
11:56:42 INFO Thread-1 done
threaded: 6.49218010902
299230 paths
os.walk: 1.6940600872
175741 paths

Upvotes: 2

Related Questions