Carbon
Carbon

Reputation: 3943

How to functionally compose futures?

I have a thread object that I can't distribute across a ProcessPoolExecutor, but would like to return a future. If I already have a future, is there a way to apply to its completed value, eg, Future a -> (a -> b) -> Future b?

import concurrent.futures
import threading

def three(x):
    return 2+x


if __name__ == '__main__':
    trackedItem = (3, threading.Event())
    pool = concurrent.futures.ProcessPoolExecutor(3)
    poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
    *** something magic goes here ***
    #Trying to transform it into Future(int,Event)

Upvotes: 2

Views: 691

Answers (3)

NamshubWriter
NamshubWriter

Reputation: 24286

The other solutions don't support cancellation. I suggest:

from concurrent.futures import Future
from typing import TypeVar

I = TypeVar("I")
O = TypeVar("O")


class _ComposingFuture(Future):

    def __init__(self, delegate: Future[I], xform: Callback[[I], O]):
        self._delegate = delegate
        self._xform = xform
        super().__init__()
        delegate.add_done_callback(self._callback)

    def cancel(self) -> bool:
        if not self._delegate.cancel():
            return False
        super().cancel()
        return True

    def _callback(self, delegate: Future) -> None:
        if delegate.cancelled():
            super().cancel()
        else:
            try:
                self.set_result(self._xform(delegate.result()))
            except Exception as e:
                self.set_exception(e)


def compose(future: Future[I], xform: Callable[[I], O]) -> Future[O]:
    return _ComposingFuture(future, xform)

Upvotes: 0

cpchung
cpchung

Reputation: 844

@kaya3 provided a great answer but I ran into problem when adding exception handling for it to close the pool. You can find my example cpchung_example below to see how to compose future functionally. It still remains to add exception-handling to it that I dont have a good solution yet.

For comparison, I put them all into one file:



from concurrent.futures import ProcessPoolExecutor, Future
from concurrent.futures.thread import ThreadPoolExecutor


def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        try:
            x = f.result()
            y = func(x)
            future_b.set_result(y)
        except Exception as e:

            future_b.set_exception(e)

    future_a.add_done_callback(callback)
    return future_b


def func_a(x):
    return 2 + x


def func_b(x):
    return 3 * x


def func_c(x):
    raise NameError('Hi There')
    return 4 * x


def kaya3_example():
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result())  # 50


def exception_handling():
    try:

        future_a = pool.submit(func_a, 3)
        future_b = map_future(future_a, func_b)
        future_c = map_future(future_b, func_c)

        print(future_c.result())

    except Exception as e:
        pool.shutdown()

    pool.shutdown()


def f(x, y):
    return x * y


def cpchung_example():
    with ThreadPoolExecutor(max_workers=1) as executor:
        a = executor.submit(f, 2, 3)
        b = executor.submit(f, 4, 5)
        c = executor.submit(f, a.result(), b.result())

        print(c.result())


if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)

    kaya3_example()

    cpchung_example()

    # exception_handling()  # not working, still wip

Upvotes: 0

kaya3
kaya3

Reputation: 51063

Here's a way which uses a simpler setup code, without threading.Event as that doesn't seem necessary to solve the problem. Basically, you can create future_b as a new Future() yourself, and use the add_done_callback method on future_a to set the result of future_b. Here, func_a is the computation to compute the result of future_a, and func_b is the computation to compute the result of future_b using the result of future_a.

from concurrent.futures import ProcessPoolExecutor, Future

def func_a(x):
    return 2 + x

def func_b(x):
    return 10 * x

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result()) # 50

If you want a helper function to do this, you can write one: map_future takes a future and a mapping function, and returns the new mapped future as required. This version handles an exception in case f.result() or func_b throws one:

def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        try:
            x = f.result()
            y = func(x)
            future_b.set_result(y)
        except Exception as e:
            future_b.set_exception(e)

    future_a.add_done_callback(callback)
    return future_b

Caveats: this goes against the advice in the documentation for the Future class, which says:

Future instances are created by Executor.submit() and should not be created directly except for testing.

Also, if you have any errors which aren't subclasses of Exception in the callback, they will be "logged and ignored" according to the docs. I've chosen to only catch Exception in this code for simplicity, but you might prefer the sys.exc_info()[0] way of catching every possible thing that could be raised.

Upvotes: 1

Related Questions