Christopher Spears
Christopher Spears

Reputation: 1105

Using Pool with Queue in the Python multiprocessing module

I want to use the multiprocessing module to speed up traversing a directory structure. First I did some research and found this Stack Overflow thread:

How do I run os.walk in parallel in Python?

However, when I tried to adapt the code in the thread, I kept on running into a problem. Here is a little script I wrote to just test out Pool and figure out how it works.:

import os

from multiprocessing.pool import Pool
from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue

def scan():
    print "Hi!"
    while True:
        print "Inside loop"
        directory = unsearched.get()
        print "Got directory"
        unsearched.task_done()
        print "{0}".format(directory)

if __name__ == '__main__':

    # Put those directories on the queue
    unsearched = Queue()
    top_dirs = ['a', 'b', 'c']
    for d in top_dirs:
        unsearched.put(d)
    print unsearched

    # Scan the directories
    processes = 1
    pool = Pool(processes)
    for i in range(processes):
        print "Process {0}".format(i)
        pool.apply_async(scan)

    # Block until all the tasks are done
    unsearched.join()
    print 'Done'

What is happening is that the script goes inside of the loop inside of the scan function and just sits there:

PS C:\Test> python .\multiprocessing_test.py
<multiprocessing.queues.JoinableQueue object at 0x000000000272F630>
Process 0
Hi!
Inside loop

I'm sure I'm missing something simple here.

Upvotes: 2

Views: 1011

Answers (1)

dano
dano

Reputation: 94871

This actually runs fine for me on Linux, but does hang on Windows. This is because on Windows, everything inside the if __name__ ... guard doesn't get executed in the child process, which of course includes defining unsearched. That means that scan is throwing an exception when it tries to used unsearched, but that exception is never consume in the parent, so you don't see the Traceback show up in the CLI. Instead, it just hangs.

To make this work on Windows and Linux, you can use the initializer/initargs keyword arguments when you create the Pool to make unsearched in scope in the child:

def initializer(q):
    global unsearched
    unsearched = q

...

Then replace your old Pool call with this:

pool = Pool(processes, initializer=initializer, initargs=(unsearched,))

Upvotes: 2

Related Questions