Reputation: 370
I have the following code that causes Nuke to hang. Basically, what I'm trying to do is get a list of files and folders from the file system, and I am trying to speed it up through parallel processing. This works perfectly outside of Nuke, but as I said before, running this in Nuke will cause Nuke to hang. Is there a better way to do this that will cause Nuke to not hang? Preferably, I'd like to fix this through Python's standard library, or packages that are platform agnostic. But, if there's no way to do that, then I'm fine with that. Worst case, I will have to go back to not using parallel processing and find other optimizations.
Also, when I run this code in Nuke, I get the following error in the console:
Unknown units in -c from multiprocessing.forking import main; main()
#!/bin/env python
import multiprocessing
import os
CPU_COUNT = multiprocessing.cpu_count()
def _threaded_master(root):
in_queue = multiprocessing.JoinableQueue()
folder_queue = multiprocessing.JoinableQueue()
file_queue = multiprocessing.JoinableQueue()
in_queue.put(root)
for _ in xrange(CPU_COUNT):
multiprocessing.Process(target=_threaded_slave, args=(in_queue, folder_queue, file_queue)).start()
in_queue.join()
return {"folders": folder_queue, "files": file_queue}
def _threaded_slave(in_queue, folder_queue, file_queue):
while True:
path_item = in_queue.get()
if os.path.isdir(path_item):
for item in os.listdir(path_item):
path = os.path.join(path_item, item)
in_queue.put(path)
in_queue.task_done()
if __name__ == "__main__":
print _threaded_master(r"/path/to/root")
Upvotes: 2
Views: 1641
Reputation: 11
Here's a link to refer to: https://learn.foundry.com/nuke/developers/63/pythondevguide/threading.html
What's notable is the warning mentioned in there: nuke.executeInMainThread and nuke.executeInMainThreadWithResult should always be run from a child thread. If run from within the main thread, they freeze NUKE.
So, spawn a new child thread, and do your stuff there.
Upvotes: 1
Reputation: 15170
Here's my code to scan through a large tree of directories using several threads.
I'd originally written the code to use good old multiprocessing.Pool()
, because it's very easy and gives you the results of the functions. Input and output queues are not needed. Another difference is it uses processes over threads, which have some tradeoffs.
The Pool
has a big drawback: it assumes you have a static list of items to process.
So, I rewrote the code following your original example: input/output queue of directories to process, and an output queue. The caller has to explicitly grab items from the output queue.
For grins I ran a timing comparison with good old os.walk()
and... at least on my machine the traditional solution was faster. The two solutions produced quite different numbers of files, which I can't explain.
Have fun!
#!/bin/env python
import multiprocessing, threading, time
import logging, os, Queue, sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)-4s %(levelname)s %(threadName)s %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
def scan_dir(topdir):
try:
for name in os.listdir(topdir):
path = os.path.join(topdir, name)
yield (path, os.path.isdir(path))
except OSError:
logging.error('uhoh: %s', topdir)
def scan_dir_queue(inqueue, outqueue):
logging.info('start')
while True:
try:
dir_item = inqueue.get_nowait()
except Queue.Empty:
break
res = list( scan_dir(dir_item) )
logging.debug('- %d paths', len(res))
for path,isdir in res:
outqueue.put( (path,isdir) )
if isdir:
inqueue.put(path)
logging.info('done')
def thread_master(root):
dir_queue = Queue.Queue() # pylint: disable=E1101
dir_queue.put(root)
result_queue = Queue.Queue()
threads = [
threading.Thread(
target=scan_dir_queue, args=[dir_queue, result_queue]
)
for _ in range(multiprocessing.cpu_count())
]
for th in threads:
th.start()
for th in threads:
th.join()
return result_queue.queue
if __name__ == "__main__":
topdir = os.path.expanduser('~')
start = time.time()
res = thread_master(topdir)
print 'threaded:', time.time() - start
print len(res), 'paths'
def mywalk(topdir):
for (dirpath, _dirnames, filenames) in os.walk(topdir):
for name in filenames:
yield os.path.join(dirpath, name)
start = time.time()
res = list(mywalk(topdir))
print 'os.walk:', time.time() - start
print len(res), 'paths'
11:56:35 INFO Thread-1 start
11:56:35 INFO Thread-2 start
11:56:35 INFO Thread-3 start
11:56:35 INFO Thread-4 start
11:56:35 INFO Thread-2 done
11:56:35 INFO Thread-3 done
11:56:35 INFO Thread-4 done
11:56:42 INFO Thread-1 done
threaded: 6.49218010902
299230 paths
os.walk: 1.6940600872
175741 paths
Upvotes: 2