Reputation: 3943
I have a thread object that I can't distribute across a ProcessPoolExecutor, but would like to return a future. If I already have a future, is there a way to apply to its completed value, eg, Future a -> (a -> b) -> Future b
?
import concurrent.futures
import threading
def three(x):
return 2+x
if __name__ == '__main__':
trackedItem = (3, threading.Event())
pool = concurrent.futures.ProcessPoolExecutor(3)
poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
*** something magic goes here ***
#Trying to transform it into Future(int,Event)
Upvotes: 2
Views: 691
Reputation: 24286
The other solutions don't support cancellation. I suggest:
from concurrent.futures import Future
from typing import TypeVar
I = TypeVar("I")
O = TypeVar("O")
class _ComposingFuture(Future):
def __init__(self, delegate: Future[I], xform: Callback[[I], O]):
self._delegate = delegate
self._xform = xform
super().__init__()
delegate.add_done_callback(self._callback)
def cancel(self) -> bool:
if not self._delegate.cancel():
return False
super().cancel()
return True
def _callback(self, delegate: Future) -> None:
if delegate.cancelled():
super().cancel()
else:
try:
self.set_result(self._xform(delegate.result()))
except Exception as e:
self.set_exception(e)
def compose(future: Future[I], xform: Callable[[I], O]) -> Future[O]:
return _ComposingFuture(future, xform)
Upvotes: 0
Reputation: 844
@kaya3 provided a great answer but I ran into problem when adding exception handling for it to close the pool. You can find my example cpchung_example
below to see how to compose future functionally. It still remains to add exception-handling to it that I dont have a good solution yet.
For comparison, I put them all into one file:
from concurrent.futures import ProcessPoolExecutor, Future
from concurrent.futures.thread import ThreadPoolExecutor
def map_future(future_a, func):
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
try:
x = f.result()
y = func(x)
future_b.set_result(y)
except Exception as e:
future_b.set_exception(e)
future_a.add_done_callback(callback)
return future_b
def func_a(x):
return 2 + x
def func_b(x):
return 3 * x
def func_c(x):
raise NameError('Hi There')
return 4 * x
def kaya3_example():
future_a = pool.submit(func_a, 3)
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
x = f.result()
y = func_b(x)
future_b.set_result(y)
future_a.add_done_callback(callback)
print(future_b.result()) # 50
def exception_handling():
try:
future_a = pool.submit(func_a, 3)
future_b = map_future(future_a, func_b)
future_c = map_future(future_b, func_c)
print(future_c.result())
except Exception as e:
pool.shutdown()
pool.shutdown()
def f(x, y):
return x * y
def cpchung_example():
with ThreadPoolExecutor(max_workers=1) as executor:
a = executor.submit(f, 2, 3)
b = executor.submit(f, 4, 5)
c = executor.submit(f, a.result(), b.result())
print(c.result())
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
kaya3_example()
cpchung_example()
# exception_handling() # not working, still wip
Upvotes: 0
Reputation: 51063
Here's a way which uses a simpler setup code, without threading.Event
as that doesn't seem necessary to solve the problem. Basically, you can create future_b
as a new Future()
yourself, and use the add_done_callback
method on future_a
to set the result of future_b
. Here, func_a
is the computation to compute the result of future_a
, and func_b
is the computation to compute the result of future_b
using the result of future_a
.
from concurrent.futures import ProcessPoolExecutor, Future
def func_a(x):
return 2 + x
def func_b(x):
return 10 * x
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
future_a = pool.submit(func_a, 3)
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
x = f.result()
y = func_b(x)
future_b.set_result(y)
future_a.add_done_callback(callback)
print(future_b.result()) # 50
If you want a helper function to do this, you can write one: map_future
takes a future and a mapping function, and returns the new mapped future as required. This version handles an exception in case f.result()
or func_b
throws one:
def map_future(future_a, func):
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
try:
x = f.result()
y = func(x)
future_b.set_result(y)
except Exception as e:
future_b.set_exception(e)
future_a.add_done_callback(callback)
return future_b
Caveats: this goes against the advice in the documentation for the Future
class, which says:
Future
instances are created byExecutor.submit()
and should not be created directly except for testing.
Also, if you have any errors which aren't subclasses of Exception
in the callback, they will be "logged and ignored" according to the docs. I've chosen to only catch Exception
in this code for simplicity, but you might prefer the sys.exc_info()[0]
way of catching every possible thing that could be raised.
Upvotes: 1