Reputation: 5742
Is it possible to write a sequence of promise (or tasks) using only Python 3.6.1 Standard Library?
For example, a sequence promises in JavaScript is written as:
const SLEEP_INTERVAL_IN_MILLISECONDS = 200;
const alpha = function alpha (number) {
return new Promise(function (resolve, reject) {
const fulfill = function() {
return resolve(number + 1);
};
return setTimeout(fulfill, SLEEP_INTERVAL_IN_MILLISECONDS);
});
};
const bravo = function bravo (number) {
return new Promise(function (resolve, reject) {
const fulfill = function() {
return resolve(Math.ceil(1000*Math.random()) + number);
};
return setTimeout(fulfill, SLEEP_INTERVAL_IN_MILLISECONDS);
});
};
const charlie = function charlie (number) {
return new Promise(function (resolve, reject) {
return (number%2 == 0) ? reject(number) : resolve(number);
});
};
function run() {
return Promise.resolve(42)
.then(alpha)
.then(bravo)
.then(charlie)
.then((number) => {
console.log('success: ' + number)
})
.catch((error) => {
console.log('error: ' + error);
});
}
run();
Each function also returns a Promise with asynchronous processing result, that would be resolved/rejected by the immediately following promise.
I am aware of libraries such as promises-2.01b
and asyncio 3.4.3
and I am looking for a Python STL solution. Thus, if I need to import a non-STL library, I prefer using RxPython instead.
Upvotes: 38
Views: 62133
Reputation: 13415
Here's a similar program using asyncio and the async/await
syntax:
import asyncio
import random
async def alpha(x):
await asyncio.sleep(0.2)
return x + 1
async def bravo(x):
await asyncio.sleep(0.2)
return random.randint(0, 1000) + x
async def charlie(x):
if x % 2 == 0:
return x
raise ValueError(x, 'is odd')
async def main():
try:
number = await charlie(await bravo(await alpha(42)))
except ValueError as exc:
print('error:', exc.args[0])
else:
print('success:', number)
if __name__ == '__main__':
asyncio.run(main())
EDIT: If you're interested in reactive streams, you might consider using aiostream.
Here's a simple example:
import asyncio
from aiostream import stream, pipe
async def main():
# This stream computes 11² + 13² in 1.5 second
xs = (
stream.count(interval=0.1) # Count from zero every 0.1 s
| pipe.skip(10) # Skip the first 10 numbers
| pipe.take(5) # Take the following 5
| pipe.filter(lambda x: x % 2) # Keep odd numbers
| pipe.map(lambda x: x ** 2) # Square the results
| pipe.accumulate() # Add the numbers together
)
print('11² + 13² = ', await xs)
if __name__ == '__main__':
asyncio.run(main())
More examples in the documentation.
Disclaimer: I am the project maintainer.
Upvotes: 37
Reputation: 336
I needed to use Promises with Eventlet for some application. This is what I implemented (full code available on my eventlet-promise github repository):
raise Exception('Go to https://github.com/htanwar922/eventlet-promise.git')
class Promise(Thenable):
"""
A Promise represents the eventual result of an asynchronous operation.
The primary way of interacting with a promise is through its then method,
which registers callbacks to receive either a promise's eventual value or
the reason why the promise cannot be fulfilled.
A promise has an state, which can be either 'pending', 'fulfilled', or 'rejected'.
A promise has three internal properties:
- _fate is either 'resolved' (attached, fulfilled or rejected) or 'unresolved'.
- _value is the result of the operation. Initially undefined.
- _callbacks is a list of functions to call when the promise is resolved or rejected.
A promise is in one of three different states:
- pending: initial state, neither fulfilled nor rejected.
- resolved: meaning that the operation completed successfully.
- rejected: meaning that the operation failed.
- attached: meaning that the promise has been attached to another promise.
A pending promise can either be fulfilled with a value, or rejected with a
reason (error). When either of these options happens, the associated
handlers queued up by a promise's then method are called.
The promise is said to be settled if it is either fulfilled or rejected,
but not pending. Once settled, a promise can not be resettled.
Arguments:
- executor is a function with the signature executor(resolve, reject).
- resolve is a function with the signature resolve(result).
- reject is a function with the signature reject(reason).
An `executor` call is expected to do one of the following:
- Call resolveFunc(result) side-effect if it successfully completes.
- Call rejectFunc(reason) side-effect if it fails to complete.
- Register callbacks to be called when the promise is resolved or rejected.
"""
def __init__(self, executor : Callable[[Callable[[Any], None], Callable[[Any], None]], None]):
super().__init__()
executor = executor or (lambda _, __: None)
self.execute(executor)
def __del__(self):
for thread in self._threads:
thread.kill()
@staticmethod
def resolve(value : Any) -> 'Promise':
if isinstance(value, Promise):
return value
return Promise(lambda resolveFunc, _: resolveFunc(value))
@staticmethod
def reject(reason : Any):
return Promise(lambda _, rejectFunc: rejectFunc(reason))
@staticmethod
def all(promises : List['Promise']):
def executor(resolveFunc, rejectFunc):
def chainExecute(promises : List['Promise'], results, resolveFunc, rejectFunc):
assert promises, 'No promises to chain'
promises = list(promises)
promise_ : Promise = promises.pop(0)
nextPromise : Promise = promises[0] if promises else None
promise_.then(lambda x, promise_=promise_, nextPromise=nextPromise:
nextPromise.waitExecute(chainExecute,
promises, results + [x],
resolveFunc, rejectFunc
) if nextPromise
else resolveFunc(results + [x])
, rejectFunc)
return hub.spawn(chainExecute, promises, [], resolveFunc, rejectFunc)
return Promise(executor)
@staticmethod
def allSettled(promises : List['Promise']) -> 'Promise':
if not promises:
return Promise.resolve([])
def executor(resolveFunc, rejectFunc):
def chainExecute(promises : List['Promise'], results, resolveFunc, rejectFunc):
assert promises, 'No promises to chain'
promises = list(promises)
promise_ : Promise = promises.pop(0)
nextPromise : Promise = promises[0] if promises else None
promise_.then(lambda x, promise_=promise_, nextPromise=nextPromise:
nextPromise.waitExecute(chainExecute,
promises, results + [{'status': promise_.getState(), 'value': x}],
resolveFunc, rejectFunc
) if nextPromise
else resolveFunc(results + [{'status': promise_.getState(), 'value': x}])
, lambda x, promise_=promise_, nextPromise=nextPromise:
nextPromise.waitExecute(chainExecute,
promises, results + [{'status': promise_.getState(), 'reason': x}],
resolveFunc, rejectFunc
) if nextPromise
else resolveFunc(results + [{'status': promise_.getState(), 'reason': x}])
)
return hub.spawn(chainExecute, promises, [], resolveFunc, resolveFunc)
return Promise(executor)
def then(self, onFulfilled : Callable[[Any], Any] = None, onRejected : Callable[[Any], Any] = None):
def raise_(reason):
if isinstance(reason, Exception):
raise reason
raise Exception(reason) # pylint: disable=broad-exception-raised
onFulfilled = onFulfilled if callable(onFulfilled) else (lambda value: value)
onRejected = onRejected if callable(onRejected) else raise_
try:
if self.isFulfilled():
value = onFulfilled(self._value)
promise_ = Promise(lambda resolveFunc, _: self.waitExecute(resolveFunc, value))
# hub.sleep(0)
return promise_
if self.isRejected():
value = onRejected(self._value) # pylint: disable=assignment-from-no-return
promise_ = Promise(lambda _, rejectFunc: self.waitExecute(rejectFunc, value))
# hub.sleep(0)
return promise_
promise_ = Promise(lambda resolveFunc, _: resolveFunc(self))
promise_.referenceTo(self, onFulfilled, onRejected)
return promise_
except Exception as error: # pylint: disable=broad-except
return Promise.reject(error)
def catch(self, onRejected : Callable[[Any], Any] = None):
return self.then(None, onRejected)
def finally_(self, onFinally : Callable[[Any], Any] = None):
return self.then(onFinally, onFinally)
if __name__ == '__main__':
def executor_(resolveFunc : Callable[[Any], None], rejectFunc : Callable[[Any], None]): # match, timeout
t1, t2 = 5, 6
# print(t := 1.5 * random())
hub.spawn_after(t1, lambda: print('\tResolving') or resolveFunc(t1))
hub.spawn_after(t2, lambda: print('\tRejecting') or rejectFunc(TimeoutError("Timed out")))
promise = Promise(executor_)
new_promise = promise.then()
# attached = Promise(lambda resolveFunc, rejectFunc: None)
attached = Promise(lambda resolveFunc, rejectFunc: resolveFunc(new_promise))
attached.referenceTo(new_promise)
p1 = Promise.resolve(1).then(2).then()
p2 = Promise.reject(1).then(2, 2).then().then()
p3 = p1.then()
print(promise)
print(new_promise)
print(attached)
print(p1)
print(p2)
print(p3)
p_all = Promise.all([p1, p3, new_promise, promise])
print('all', p_all)
p_settled = Promise.allSettled([p1, p2, p3, new_promise, promise])
print('allSettled', p_settled)
print('\nFinished\n')
hub.sleep(1)
while True:
hub.sleep(0)
try:
print()
print(promise)
print(new_promise)
print(attached)
print(attached.then())
print(p1)
print(p2)
print(p3)
print('all', p_all)
print('allSettled', p_settled)
hub.sleep(3)
except KeyboardInterrupt:
sys.exit(0)
Upvotes: 0
Reputation: 437
You can create your own Class Promise, i'm not a python dev but i tried to create something similar to javascript.
class Promise:
def __init__(self, callback):
self.resolved = ''
self.rejected = ''
callback(self.resolve, self.reject)
def resolve(self, value):
self.resolved = value
def reject(self, value):
self.rejected = value
def then(self, callback):
if not self.rejected:
self.resolved = callback(self.resolved)
return self
def catch(self, callback):
if self.rejected:
self.rejected = callback(self.rejected)
return self
def myPromise(resolve, reject):
resolve(['Ana', 'Bia', 'Carlos', 'Daniel'])
def firstResolve(value):
return value[0]
def secondResolve(value):
print(value)
def firstReject(value):
print('error:', value)
p = Promise(myPromise)
p.then(firstResolve).then(secondResolve).catch(firstReject)
Promise.all example
class Promise:
def __init__(self, callback):
self.resolved = ''
self.rejected = ''
if callable(callback):
callback(self.resolve, self.reject)
def resolve(self, value):
self.resolved = value
def reject(self, value):
self.rejected = value
def then(self, callback):
if not self.rejected:
self.resolved = callback(self.resolved)
return self
def catch(self, callback):
if self.rejected:
self.rejected = callback(self.rejected)
return self
def all(self, promises):
resolvedArray = []
rejectedArray = []
for promise in promises:
promise(self.resolve, self.reject)
if self.resolved:
resolvedArray += self.resolved
if self.rejected:
rejectedArray += self.rejected
break
self.resolved = resolvedArray
self.rejected = rejectedArray
return self
def myPromise1(resolve, reject):
resolve(['Ana'])
def myPromise2(resolve, reject):
resolve(['Bia'])
def myPromise3(resolve, reject):
resolve(['Carlos'])
def myPromise4(resolve, reject):
resolve(['Daniel'])
def allResolve(values):
print('without error: ', values)
def allReject(values):
print('with error: ', values)
p = Promise([])
p.all([myPromise1, myPromise2]).then(allResolve).catch(allReject)
Upvotes: -1
Reputation: 80081
You're in luck, Python 3.4 and above include asyncio
, although the feature you are looking for (Future) is available in Python 3.5 and up.
From your own link about asyncio
: "This version is only relevant for Python 3.3, which does not include asyncio in its stdlib."
Example:
import asyncio
async def some_coroutine():
await asyncio.sleep(1)
return 'done'
def process_result(future):
print('Task returned:', future.result())
loop = asyncio.get_event_loop()
task = loop.create_task(some_coroutine())
task.add_done_callback(process_result)
loop.run_until_complete()
Upvotes: 9