Elliott Slaughter
Elliott Slaughter

Reputation: 1555

Nested parallelism in Python multiprocessing

I know this sounds like something that has been asked before, but wait, I'll explain why the other options don't work.

I'm currently using multiprocessing.Pool to implement parallelism in an application, and would like to extend this to be able to exploit nested parallelism. The naive approach of just passing the Pool object as an argument to apply_async doesn't work as noted in other answers, because Pool cannot be pickled.

Here are my requirements:

  1. I need some sort of a pool to limit the number of concurrent executing tasks. E.g. multiprocess.Pool serves this purposes, except it can't be passed to other processes.

  2. I need nested parallelism. In my application, I need to perform I/O in order to identify what the nested work is, so I absolutely don't want to do this from a single thread. I think that rules out all the answers to this question.

  3. It needs to be in the standard library; I can't add dependencies. That rules out this answer.

  4. I'd really like it to work with both Python 2 and 3. However, if it could be shown that moving to Python 3 would solve my problem, I would consider it.

I don't need this to use multiple processes specifically, it would be ok to use threads because most of the work is I/O or waiting on subprocesses to complete.

I have tried using multiprocessing.dummy, which is the same interface but implemented on top of threading. However, when I try to call get() to retrieve the results of my tests, I get the following error, so I think this is out.

  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
ValueError: signal only works in main thread

I am aware of the concurrent.futures library in Python 3, but this appears to have some severe limitations. For example, the second example in this section would seem to be a show stopper in my case:

https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor

I don't see how you could avoid hitting that with basically any straightforwardly-written nested parallel algorithm. So even if I was willing to use Python 3, I think this is a non-starter.

I'm not aware of any other options available in the standard library, without writing my own implementation.

Upvotes: 7

Views: 5513

Answers (1)

Michal Charemza
Michal Charemza

Reputation: 27062

You seem to have ruled it out, but I suspect https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor, or https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor would work, if you are able to move to Python 3, or add a dependency for Python 2.

If the extra work from each file doesn't have to be triggered until that file is processed, you can have a single coordinating thread that triggers all the others, and so deadlock can be prevented, as in below example.

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))
    return work

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    work_inputs = pool.submit(find_work_inputs, dummy_file)
    for work_input in work_inputs.result():
        result = work_input
        futures.append((dummy_file, result, pool.submit(do_work, dummy_file, result)))

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))

Alternatively, if each thread on the first level needs to trigger the work themselves, the extra work may need to be in another pool to prevent deadlock (depending on when result() is called on each future) as below.

from concurrent.futures import ThreadPoolExecutor
import time

find_work_pool = ThreadPoolExecutor(max_workers=3)
do_work_pool = ThreadPoolExecutor(max_workers=3)

def find_work_inputs(dummy_file):
    print("{}: Finding work...".format(dummy_file))
    time.sleep(1)
    work = range(0, dummy_file)
    print("{}: Work is {}".format(dummy_file, work))

    futures = []
    for work_input in work:
        futures.append((dummy_file, work_input, do_work_pool.submit(do_work, dummy_file, work_input)))
    return futures

def do_work(dummy_file, work_input):
    print("{}: {}".format(dummy_file, work_input))
    print("{}: Doing work {}...".format(dummy_file, work_input))
    time.sleep(1)
    return work_input * work_input

dummy_files = [1,2,3,4,5]

futures = []
for dummy_file in dummy_files:
    futures.extend(find_work_pool.submit(find_work_inputs, dummy_file).result())

for dummy_file, work_input, future in futures:
    print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))

Upvotes: 2

Related Questions