user3638629
user3638629

Reputation: 145

Multiprocessing thousands of files with external command

I want to launch an external command from Python for about 8000 files. Every file is processed independently from the others. The only constraint is to continue execution once all files have been processed. I have 4 physical cores, each one with 2 logical cores (multiprocessing.cpu_count() returns 8). My idea was to use a pool of four parallel independent processes that are to be run on 4 of the 8 cores. This way my machine should be usable in the meantime.

Here's what I've been doing:

import multiprocessing
import subprocess
import os
from multiprocessing.pool import ThreadPool


def process_files(input_dir, output_dir, option):
    pool = ThreadPool(multiprocessing.cpu_count()/2)
    for filename in os.listdir(input_dir):  # about 8000 files
        f_in = os.path.join(input_dir, filename)
        f_out = os.path.join(output_dir, filename)
        cmd = ['molconvert', option, f_in, '-o', f_out]
        pool.apply_async(subprocess.Popen, (cmd,))
    pool.close()
    pool.join()


def main():
    process_files('dir1', 'dir2', 'mol:H')
    do_some_stuff('dir2')
    process_files('dir2', 'dir3', 'mol:a')
    do_more_stuff('dir3')

A sequential treatment takes 120s for a batch of 100 files. The multiprocessing version outlined above (function process_files) takes only 20s for the batch. However, when I run process_files on the whole set of 8000 files, my PC hangs and does not un-freeze after one hour.

My questions are:

1) I thought ThreadPool is supposed to initialize a pool of processes (of multiprocessing.cpu_count()/2 processes here, to be exact). However my computer hanging up on 8000 files but not on 100 suggests that maybe the size of the pool is not taken into account. Either that, or I'm doing something wrong. Could you explain?

2) Is this the right way to launch independent processes under Python when each of them must launch an external command, and in such a way that all the resources are not taken up by the processing?

Upvotes: 3

Views: 1712

Answers (2)

larsks
larsks

Reputation: 312500

I think your basic problem is the use of subprocess.Popen. That method does not wait for a command to complete before returning. Since the function returns immediately (even though the command is still running), the function is finished as far as your ThreadPool is concerned and it can spawn another...which means that you end up spawning 8000 or so processes.

You would probably have better luck using subprocess.check_call:

Run command with arguments.  Wait for command to complete.  If
the exit code was zero then return, otherwise raise
CalledProcessError.  The CalledProcessError object will have the
return code in the returncode attribute.

So:

def process_files(input_dir, output_dir, option):
    pool = ThreadPool(multiprocessing.cpu_count()/2)
    for filename in os.listdir(input_dir):  # about 8000 files
        f_in = os.path.join(input_dir, filename)
        f_out = os.path.join(output_dir, filename)
        cmd = ['molconvert', option, f_in, '-o', f_out]
        pool.apply_async(subprocess.check_call, (cmd,))
    pool.close()
    pool.join()

If you really don't care about the exit code, then you may want subprocess.call, which will not raise an exception in the event of a non-zero exit code from the process.

Upvotes: 1

Roland Smith
Roland Smith

Reputation: 43533

If you are using Python 3, I would consider using the map method of concurrent.futures.ThreadPoolExecutor.

Alternatively, you can manage a list of subprocesses yourself.

The following example defines a function to start ffmpeg to convert a video file to Theora/Vorbis format. It returns a Popen object for each started subprocess.

def startencoder(iname, oname, offs=None):
    args = ['ffmpeg']
    if offs is not None and offs > 0:
        args += ['-ss', str(offs)]
    args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a',
            'libvorbis', '-q:a', '3', '-sn', oname]
    with open(os.devnull, 'w') as bb:
        p = subprocess.Popen(args, stdout=bb, stderr=bb)
    return p

In the main program, a list of Popen objects representing running subprocesses is maintained like this.

outbase = tempname()
ogvlist = []
procs = []
maxprocs = cpu_count()
for n, ifile in enumerate(argv):
    # Wait while the list of processes is full.
    while len(procs) == maxprocs:
        manageprocs(procs)
    # Add a new process
    ogvname = outbase + '-{:03d}.ogv'.format(n + 1)
    procs.append(startencoder(ifile, ogvname, offset))
    ogvlist.append(ogvname)
# All jobs have been submitted, wail for them to finish.
while len(procs) > 0:
    manageprocs(procs)

So a new process is only started when there are less running subprocesses than cores. Code that is used multiple times is separated into the manageprocs function.

def manageprocs(proclist):
    for pr in proclist:
        if pr.poll() is not None:
            proclist.remove(pr)
    sleep(0.5)

The call to sleep is used to prevent the program from spinning in the loop.

Upvotes: 1

Related Questions