Reputation: 1604
I need to convert 86,000 TEX files to XML using the LaTeXML library in the command line. I tried to write a Python script to automate this with the subprocess
module, utilizing all 4 cores.
def get_outpath(tex_path):
path_parts = pathlib.Path(tex_path).parts
arxiv_id = path_parts[2]
outpath = 'xml/' + arxiv_id + '.xml'
return outpath
def convert_to_xml(inpath):
outpath = get_outpath(inpath)
if os.path.isfile(outpath):
message = '{}: Already converted.'.format(inpath)
print(message)
return
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
message = '{}: Converted!'.format(inpath)
print(message)
def start():
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
maxtasksperchild=1)
print('Initialized {} threads'.format(multiprocessing.cpu_count()))
print('Beginning conversion...')
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
pool.close()
pool.join()
print("TIME: {}".format(total_time))
start()
The script results in Too many open files
and slows down my computer. From looking at Activity Monitor, it looks like this script is trying to create 86,000 conversion subprocesses at once, and each process is trying to open a file. Maybe this is the result of pool.imap_unordered(convert_to_xml, preprints)
-- maybe I need to not use map in conjunction with subprocess.Popen
, since I just have too many commands to call? What would be an alternative?
I've spent all day trying to figure out the right way to approach bulk subprocessing. I'm new to this part of Python, so any tips for heading in the right direction would be much appreciated. Thanks!
Upvotes: 2
Views: 519
Reputation: 879501
In convert_to_xml
, the process = subprocess.Popen(...)
statements spawns a latexml
subprocess.
Without a blocking call such as process.communicate()
, the convert_to_xml
ends even while latexml
continues to run in the background.
Since convert_to_xml
ends, the Pool sends the associated worker process another task to run and so convert_to_xml
is called again.
Once again another latexml
process is spawned in the background.
Pretty soon, you are up to your eyeballs in latexml
processes and the resource limit on the number of open files is reached.
The fix is easy: add process.communicate()
to tell convert_to_xml
to wait until the latexml
process has finished.
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
Regarding if __name__ == '__main__'
:
As martineau pointed out, there is a warning in the multiprocessing docs that
code that spawns new processes should not be called at the top level of a module.
Instead, the code should be contained inside a if __name__ == '__main__'
statement.
In Linux, nothing terrible happens if you disregard this warning.
But in Windows, the code "fork-bombs". Or more accurately, the code
causes an unmitigated chain of subprocesses to be spawned, because on Windows fork
is simulated by spawning a new Python process which then imports the calling script. Every import spawns a new Python process. Every Python process tries to import the calling script. The cycle is not broken until all resources are consumed.
So to be nice to our Windows-fork-bereft brethren, use
if __name__ == '__main__:
start()
Sometimes processes require a lot of memory. The only reliable way to free memory is to terminate the process. maxtasksperchild=1
tells the pool
to terminate each worker process after it completes 1 task. It then spawns a new worker process to handle another task (if there are any). This frees the (memory) resources the original worker may have allocated which could not otherwise have been freed.
In your situation it does not look like the worker process is going to require much memory, so you probably don't need maxtasksperchild=1
.
In convert_to_xml
, the process = subprocess.Popen(...)
statements spawns a latexml
subprocess.
Without a blocking call such as process.communicate()
, the convert_to_xml
ends even while latexml
continues to run in the background.
Since convert_to_xml
ends, the Pool sends the associated worker process another task to run and so convert_to_xml
is called again.
Once again another latexml
process is spawned in the background.
Pretty soon, you are up to your eyeballs in latexml
processes and the resource limit on the number of open files is reached.
The fix is easy: add process.communicate()
to tell convert_to_xml
to wait until the latexml
process has finished.
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
The chunksize
affects how many tasks a worker performs before sending the result back to the main process.
Sometimes this can affect performance, especially if interprocess communication is a signficant portion of overall runtime.
In your situation, convert_to_xml
takes a relatively long time (assuming we wait until latexml
finishes) and it simply returns None
. So interprocess communication probably isn't a significant portion of overall runtime. Therefore, I don't expect you would find a significant change in performance in this case (though it never hurts to experiment!).
In plain Python, map
should not be used just to call a function multiple times.
For a similar stylistic reason, I would reserve using the pool.*map*
methods for situations where I cared about the return values.
So instead of
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
you might consider using
for preprint in preprints:
pool.apply_async(convert_to_xml, args=(preprint, ))
instead.
The iterable passed to any of the pool.*map*
functions is consumed
immediately. It doesn't matter if the iterable is an iterator. There is no
special memory benefit to using an iterator here. imap_unordered
returns an
iterator, but it does not handle its input in any especially iterator-friendly
way.
No matter what type of iterable you pass, upon calling the pool.*map*
function the iterable is
consumed and turned into tasks which are put into a task queue.
Here is code which corroborates this claim:
version1.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in pool.imap_unordered(foo, gen()):
pass
pool.close()
pool.join()
if __name__ == '__main__':
start()
version2.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in gen():
result = pool.apply_async(foo, args=(item, ))
pool.close()
pool.join()
if __name__ == '__main__':
start()
Running version1.py
and version2.py
both produce the same result.
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Crucially, you will notice that Got here
is printed 10 times very quickly at
the beginning of the run, and then there is a long pause (while the calculation
is done) before the program ends.
If the generator gen()
were somehow consumed slowly by pool.imap_unordered
,
we should expect Got here
to be printed slowly as well. Since Got here
is
printed 10 times and quickly, we can see that the iterable gen()
is being
completely consumed well before the tasks are completed.
Running these programs should hopefully give you confidence that
pool.imap_unordered
and pool.apply_async
are putting tasks in the queue
essentially in the same way: immediate after the call is made.
Upvotes: 3