Amanda
Amanda

Reputation: 526

Python multiprocessing pool map_async freezes

I have a list of 80,000 strings that I am running through a discourse parser, and in order to increase the speed of this process I have been trying to use the python multiprocessing package.

The parser code requires python 2.7 and I am currently running it on a 2-core Ubuntu machine using a subset of the strings. For short lists, i.e. 20, the process runs without an issue on both cores, however if I run a list of about 100 strings, both workers will freeze at different points (so in some cases worker 1 won't stop until a few minutes after worker 2). This happens before all the strings are finished and anything is returned. Each time the cores stop at the same point given the same mapping function is used, but these points are different if I try a different mapping function, i.e. map vs map_async vs imap.

I have tried removing the strings at those indices, which did not have any affect and those strings run fine in a shorter list. Based on print statements I included, when the process appears to freeze the current iteration seems to finish for the current string and it just does not move on to the next string. It takes about an hour of run time to reach the spot where both workers have frozen and I have not been able to reproduce the issue in less time. The code involving the multiprocessing commands is:

def main(initial_file, chunksize = 2):
    entered_file = pd.read_csv(initial_file)
    entered_file = entered_file.ix[:, 0].tolist()

    pool = multiprocessing.Pool()

    result = pool.map_async(discourse_process, entered_file, chunksize = chunksize)

    pool.close()
    pool.join()

    with open("final_results.csv", 'w') as file:
        writer = csv.writer(file)
        for listitem in result.get():
            writer.writerow([listitem[0], listitem[1]])

if __name__ == '__main__':
    main(sys.argv[1])

When I stop the process with Ctrl-C (which does not always work), the error message I receive is:

^CTraceback (most recent call last):
  File "Combined_Script.py", line 94, in <module>
    main(sys.argv[1])
  File "Combined_Script.py", line 85, in main
    pool.join()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 474, in join
    p.join()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 145, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python2.7/multiprocessing/forking.py", line 154, in wait
    return self.poll(0)
  File "/usr/lib/python2.7/multiprocessing/forking.py", line 135, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Process PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
    put((job, i, result))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt
^CProcess PoolWorker-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
    put((job, i, result))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 392, in put
    return send(obj)
KeyboardInterrupt
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
    _run_finalizers(0)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
    finalizer()
  File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
    outqueue.put(None)                  # sentinel
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
    _run_finalizers(0)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
    finalizer()
  File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
    outqueue.put(None)                  # sentinel
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt

When I look at the memory in another command window using htop, memory is at <3% once the workers freeze. This is my first attempt at parallel processing and I am not sure what else I might be missing?

Upvotes: 4

Views: 1586

Answers (2)

Miguel Trejo
Miguel Trejo

Reputation: 6667

You could define a time to your process to return a result, otherwise it would raise an error:

try:
    result.get(timeout = 1)
except multiprocessing.TimeoutError:
    print("Error while retrieving the result")

Also you could verify if your process is succesful with

import time
while True:
    try:
        result.succesful()
    except Exception:
        print("Result is not yet succesful")
    time.sleep(1)

Finally, checking out https://docs.python.org/2/library/multiprocessing.html ,is helpful.

Upvotes: 1

Amanda
Amanda

Reputation: 526

I was not able to solve the issue with multiprocessing pool, however I came across the loky package and was able to use it to run my code with the following lines:

executor = loky.get_reusable_executor(timeout = 200, kill_workers = True)
results = executor.map(discourse_process, entered_file)

Upvotes: 1

Related Questions