Uladzislau
Uladzislau

Reputation: 53

Python: parallel execution of yield expressions in generator

I have a generator function that iterates over a big number of parameters and yields result of another function with this parameters. Inner function may have quite a long time of execution, so I would like to use multiprocessing to speed up process. Maybe it's important, I also would like to have an ability to stop this generator in middle of execution. But I'm not sure what is the right way to implement such logic. I need something like queue, giving the ability to add new tasks after old ones have been finished and to yield results as soon as they ready. I've looked over multiprocessing.Queue, but at first glance it seems not suitable for my case. May be somebody can advise what should I use in such scenario?

Here is approximate code of my task:

def gen(**kwargs):
    for param in get_params():
        yield inner_func(param)

Upvotes: 1

Views: 2200

Answers (1)

Booboo
Booboo

Reputation: 44013

Use a multiprocessing.pool.Pool class for multiprocessing since its terminate method will cancel both all running tasks as well as those scheduled to run (the concurrent.futures module terminate method will not cancel already running tasks). And as @MisterMiyakgi indicated, it should not be necessary to use a generator. However, you should use the imap_unordered method, which returns an iterable that can be iterated and allows you to get results as they are generated by your inner_function, whereas if you were to use map you would not be able to get the first generated value until all values had been generated.

from multiprocessing import Pool

def get_params():
    """ Generator function. """
    # For example:
    for next_param in range(10):
        yield next_param

def inner_function(param):
    """ Long running function. """
    # For example:
    return param ** 2

def gen():
    pool = Pool()
    # Use imap_unordered if we do not care about the order of results else imap:
    iterable = pool.imap_unordered(inner_function, get_params())
    # The iterable can be iterated as if it were a generator
    # Add terminate method to iterable:
    def terminate():
        pool.terminate()
        pool.close()
        pool.join()
    iterable.terminate = terminate
    return iterable


# Usage:
# Required for Windows
if __name__ == '__main__':
    iterable = gen()
    # iterable.terminate() should be called when done iterating the iterable
    # but it can be called any time to kill all running tasks and scheduled tasks.
    # After calling terminate() do not further iterate the iterable.
    for result in iterable:
        print(result)
        if result == 36:
            iterable.terminate() # kill all remaining tasks, if any
            break

Prints:

0
1
4
9
16
25
36

Upvotes: 2

Related Questions