brienna
brienna

Reputation: 1604

How to subprocess a big list of files using all CPUs?

I need to convert 86,000 TEX files to XML using the LaTeXML library in the command line. I tried to write a Python script to automate this with the subprocess module, utilizing all 4 cores.

def get_outpath(tex_path):
    path_parts = pathlib.Path(tex_path).parts
    arxiv_id = path_parts[2]
    outpath = 'xml/' + arxiv_id + '.xml'
    return outpath

def convert_to_xml(inpath):
    outpath = get_outpath(inpath)

    if os.path.isfile(outpath):
        message = '{}: Already converted.'.format(inpath)
        print(message)
        return

    try:
        process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                                   stderr=subprocess.PIPE, 
                                   stdout=subprocess.PIPE)
    except Exception as error:
        process.kill()
        message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
        print(message)

    message = '{}: Converted!'.format(inpath)
    print(message)

def start():
    start_time = time.time()
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
                               maxtasksperchild=1)
    print('Initialized {} threads'.format(multiprocessing.cpu_count()))
    print('Beginning conversion...')
    for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
        pass
    pool.close()
    pool.join()
    print("TIME: {}".format(total_time))

start()

The script results in Too many open files and slows down my computer. From looking at Activity Monitor, it looks like this script is trying to create 86,000 conversion subprocesses at once, and each process is trying to open a file. Maybe this is the result of pool.imap_unordered(convert_to_xml, preprints) -- maybe I need to not use map in conjunction with subprocess.Popen, since I just have too many commands to call? What would be an alternative?

I've spent all day trying to figure out the right way to approach bulk subprocessing. I'm new to this part of Python, so any tips for heading in the right direction would be much appreciated. Thanks!

Upvotes: 2

Views: 519

Answers (1)

unutbu
unutbu

Reputation: 879501

In convert_to_xml, the process = subprocess.Popen(...) statements spawns a latexml subprocess. Without a blocking call such as process.communicate(), the convert_to_xml ends even while latexml continues to run in the background.

Since convert_to_xml ends, the Pool sends the associated worker process another task to run and so convert_to_xml is called again. Once again another latexml process is spawned in the background. Pretty soon, you are up to your eyeballs in latexml processes and the resource limit on the number of open files is reached.

The fix is easy: add process.communicate() to tell convert_to_xml to wait until the latexml process has finished.

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

Regarding if __name__ == '__main__':

As martineau pointed out, there is a warning in the multiprocessing docs that code that spawns new processes should not be called at the top level of a module. Instead, the code should be contained inside a if __name__ == '__main__' statement.

In Linux, nothing terrible happens if you disregard this warning. But in Windows, the code "fork-bombs". Or more accurately, the code causes an unmitigated chain of subprocesses to be spawned, because on Windows fork is simulated by spawning a new Python process which then imports the calling script. Every import spawns a new Python process. Every Python process tries to import the calling script. The cycle is not broken until all resources are consumed.

So to be nice to our Windows-fork-bereft brethren, use

if __name__ == '__main__:
    start()

Sometimes processes require a lot of memory. The only reliable way to free memory is to terminate the process. maxtasksperchild=1 tells the pool to terminate each worker process after it completes 1 task. It then spawns a new worker process to handle another task (if there are any). This frees the (memory) resources the original worker may have allocated which could not otherwise have been freed.

In your situation it does not look like the worker process is going to require much memory, so you probably don't need maxtasksperchild=1. In convert_to_xml, the process = subprocess.Popen(...) statements spawns a latexml subprocess. Without a blocking call such as process.communicate(), the convert_to_xml ends even while latexml continues to run in the background.

Since convert_to_xml ends, the Pool sends the associated worker process another task to run and so convert_to_xml is called again. Once again another latexml process is spawned in the background. Pretty soon, you are up to your eyeballs in latexml processes and the resource limit on the number of open files is reached.

The fix is easy: add process.communicate() to tell convert_to_xml to wait until the latexml process has finished.

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)

The chunksize affects how many tasks a worker performs before sending the result back to the main process. Sometimes this can affect performance, especially if interprocess communication is a signficant portion of overall runtime.

In your situation, convert_to_xml takes a relatively long time (assuming we wait until latexml finishes) and it simply returns None. So interprocess communication probably isn't a significant portion of overall runtime. Therefore, I don't expect you would find a significant change in performance in this case (though it never hurts to experiment!).


In plain Python, map should not be used just to call a function multiple times.

For a similar stylistic reason, I would reserve using the pool.*map* methods for situations where I cared about the return values.

So instead of

for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
    pass

you might consider using

for preprint in preprints:
    pool.apply_async(convert_to_xml, args=(preprint, ))

instead.


The iterable passed to any of the pool.*map* functions is consumed immediately. It doesn't matter if the iterable is an iterator. There is no special memory benefit to using an iterator here. imap_unordered returns an iterator, but it does not handle its input in any especially iterator-friendly way.

No matter what type of iterable you pass, upon calling the pool.*map* function the iterable is consumed and turned into tasks which are put into a task queue.

Here is code which corroborates this claim:

version1.py:

import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()
    for item in pool.imap_unordered(foo, gen()):
        pass

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

version2.py:

import multiprocessing as mp
import time
def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()

    for item in gen():
        result = pool.apply_async(foo, args=(item, ))

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()

Running version1.py and version2.py both produce the same result.

Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here

Crucially, you will notice that Got here is printed 10 times very quickly at the beginning of the run, and then there is a long pause (while the calculation is done) before the program ends.

If the generator gen() were somehow consumed slowly by pool.imap_unordered, we should expect Got here to be printed slowly as well. Since Got here is printed 10 times and quickly, we can see that the iterable gen() is being completely consumed well before the tasks are completed.

Running these programs should hopefully give you confidence that pool.imap_unordered and pool.apply_async are putting tasks in the queue essentially in the same way: immediate after the call is made.

Upvotes: 3

Related Questions