JP Ventura
JP Ventura

Reputation: 5742

How do I write a sequence of promises in Python?

Is it possible to write a sequence of promise (or tasks) using only Python 3.6.1 Standard Library?

For example, a sequence promises in JavaScript is written as:

const SLEEP_INTERVAL_IN_MILLISECONDS = 200;

const alpha = function alpha (number) {
    return new Promise(function (resolve, reject) {
        const fulfill = function() {
            return resolve(number + 1);
        };

        return setTimeout(fulfill, SLEEP_INTERVAL_IN_MILLISECONDS);
    });
};

const bravo = function bravo (number) {
    return new Promise(function (resolve, reject) {
        const fulfill = function() {
            return resolve(Math.ceil(1000*Math.random()) + number);
        };
        return setTimeout(fulfill, SLEEP_INTERVAL_IN_MILLISECONDS);
    });
};

const charlie = function charlie (number) {
    return new Promise(function (resolve, reject) {
        return (number%2 == 0) ? reject(number) : resolve(number);
    });
};

function run() {
    return Promise.resolve(42)
        .then(alpha)
        .then(bravo)
        .then(charlie)
        .then((number) => {
            console.log('success: ' + number)
        })
        .catch((error) => {
            console.log('error: ' + error);
        });
}

run();

Each function also returns a Promise with asynchronous processing result, that would be resolved/rejected by the immediately following promise.

I am aware of libraries such as promises-2.01b and asyncio 3.4.3 and I am looking for a Python STL solution. Thus, if I need to import a non-STL library, I prefer using RxPython instead.

Upvotes: 38

Views: 62133

Answers (4)

Vincent
Vincent

Reputation: 13415

Here's a similar program using asyncio and the async/await syntax:

import asyncio
import random

async def alpha(x):
    await asyncio.sleep(0.2)
    return x + 1 

async def bravo(x):
    await asyncio.sleep(0.2)
    return random.randint(0, 1000) + x

async def charlie(x):
    if x % 2 == 0:
        return x
    raise ValueError(x, 'is odd')

async def main():
    try:
        number = await charlie(await bravo(await alpha(42)))
    except ValueError as exc:
        print('error:', exc.args[0])
    else:
        print('success:', number)

if __name__ == '__main__':
    asyncio.run(main())

EDIT: If you're interested in reactive streams, you might consider using aiostream.

Here's a simple example:

import asyncio
from aiostream import stream, pipe

async def main():
    # This stream computes 11² + 13² in 1.5 second
    xs = (
        stream.count(interval=0.1)      # Count from zero every 0.1 s
        | pipe.skip(10)                 # Skip the first 10 numbers
        | pipe.take(5)                  # Take the following 5
        | pipe.filter(lambda x: x % 2)  # Keep odd numbers
        | pipe.map(lambda x: x ** 2)    # Square the results
        | pipe.accumulate()             # Add the numbers together
    )
    print('11² + 13² = ', await xs)

if __name__ == '__main__':
    asyncio.run(main())

More examples in the documentation.

Disclaimer: I am the project maintainer.

Upvotes: 37

Himanshu Tanwar
Himanshu Tanwar

Reputation: 336

I needed to use Promises with Eventlet for some application. This is what I implemented (full code available on my eventlet-promise github repository):

raise Exception('Go to https://github.com/htanwar922/eventlet-promise.git')

class Promise(Thenable):
    """
    A Promise represents the eventual result of an asynchronous operation.
    The primary way of interacting with a promise is through its then method,
    which registers callbacks to receive either a promise's eventual value or
    the reason why the promise cannot be fulfilled.

    A promise has an state, which can be either 'pending', 'fulfilled', or 'rejected'.

    A promise has three internal properties:
    - _fate is either 'resolved' (attached, fulfilled or rejected) or 'unresolved'.
    - _value is the result of the operation. Initially undefined.
    - _callbacks is a list of functions to call when the promise is resolved or rejected.

    A promise is in one of three different states:
    - pending: initial state, neither fulfilled nor rejected.
    - resolved: meaning that the operation completed successfully.
    - rejected: meaning that the operation failed.
    - attached: meaning that the promise has been attached to another promise.

    A pending promise can either be fulfilled with a value, or rejected with a
    reason (error). When either of these options happens, the associated
    handlers queued up by a promise's then method are called.

    The promise is said to be settled if it is either fulfilled or rejected,
    but not pending. Once settled, a promise can not be resettled.
    
    Arguments:
    - executor is a function with the signature executor(resolve, reject).
        - resolve is a function with the signature resolve(result).
        - reject is a function with the signature reject(reason).
        An `executor` call is expected to do one of the following:
        - Call resolveFunc(result) side-effect if it successfully completes.
        - Call rejectFunc(reason) side-effect if it fails to complete.
        - Register callbacks to be called when the promise is resolved or rejected.
    """
    def __init__(self, executor : Callable[[Callable[[Any], None], Callable[[Any], None]], None]):
        super().__init__()
        executor = executor or (lambda _, __: None)
        self.execute(executor)

    def __del__(self):
        for thread in self._threads:
            thread.kill()

    @staticmethod
    def resolve(value : Any) -> 'Promise':
        if isinstance(value, Promise):
            return value
        return Promise(lambda resolveFunc, _: resolveFunc(value))

    @staticmethod
    def reject(reason : Any):
        return Promise(lambda _, rejectFunc: rejectFunc(reason))

    @staticmethod
    def all(promises : List['Promise']):
        def executor(resolveFunc, rejectFunc):
            def chainExecute(promises : List['Promise'], results, resolveFunc, rejectFunc):
                assert promises, 'No promises to chain'
                promises = list(promises)
                promise_ : Promise = promises.pop(0)
                nextPromise : Promise = promises[0] if promises else None
                promise_.then(lambda x, promise_=promise_, nextPromise=nextPromise:
                    nextPromise.waitExecute(chainExecute,
                            promises, results + [x],
                            resolveFunc, rejectFunc
                    ) if nextPromise
                    else resolveFunc(results + [x])
                , rejectFunc)
            return hub.spawn(chainExecute, promises, [], resolveFunc, rejectFunc)
        return Promise(executor)

    @staticmethod
    def allSettled(promises : List['Promise']) -> 'Promise':
        if not promises:
            return Promise.resolve([])
        def executor(resolveFunc, rejectFunc):
            def chainExecute(promises : List['Promise'], results, resolveFunc, rejectFunc):
                assert promises, 'No promises to chain'
                promises = list(promises)
                promise_ : Promise = promises.pop(0)
                nextPromise : Promise = promises[0] if promises else None
                promise_.then(lambda x, promise_=promise_, nextPromise=nextPromise:
                    nextPromise.waitExecute(chainExecute,
                            promises, results + [{'status': promise_.getState(), 'value': x}],
                            resolveFunc, rejectFunc
                    ) if nextPromise
                    else resolveFunc(results + [{'status': promise_.getState(), 'value': x}])
                , lambda x, promise_=promise_, nextPromise=nextPromise:
                    nextPromise.waitExecute(chainExecute,
                            promises, results + [{'status': promise_.getState(), 'reason': x}],
                            resolveFunc, rejectFunc
                    ) if nextPromise
                    else resolveFunc(results + [{'status': promise_.getState(), 'reason': x}])
                )
            return hub.spawn(chainExecute, promises, [], resolveFunc, resolveFunc)
        return Promise(executor)

    def then(self, onFulfilled : Callable[[Any], Any] = None, onRejected : Callable[[Any], Any] = None):
        def raise_(reason):
            if isinstance(reason, Exception):
                raise reason
            raise Exception(reason)     # pylint: disable=broad-exception-raised
        onFulfilled = onFulfilled if callable(onFulfilled) else (lambda value: value)
        onRejected = onRejected if callable(onRejected) else raise_
        try:
            if self.isFulfilled():
                value = onFulfilled(self._value)
                promise_ = Promise(lambda resolveFunc, _: self.waitExecute(resolveFunc, value))
                # hub.sleep(0)
                return promise_
            if self.isRejected():
                value = onRejected(self._value)     # pylint: disable=assignment-from-no-return
                promise_ = Promise(lambda _, rejectFunc: self.waitExecute(rejectFunc, value))
                # hub.sleep(0)
                return promise_
            promise_ = Promise(lambda resolveFunc, _: resolveFunc(self))
            promise_.referenceTo(self, onFulfilled, onRejected)
            return promise_
        except Exception as error:          # pylint: disable=broad-except
            return Promise.reject(error)

    def catch(self, onRejected : Callable[[Any], Any] = None):
        return self.then(None, onRejected)

    def finally_(self, onFinally : Callable[[Any], Any] = None):
        return self.then(onFinally, onFinally)

if __name__ == '__main__':
    def executor_(resolveFunc : Callable[[Any], None], rejectFunc : Callable[[Any], None]):      # match, timeout
        t1, t2 = 5, 6
        # print(t := 1.5 * random())
        hub.spawn_after(t1, lambda: print('\tResolving') or resolveFunc(t1))
        hub.spawn_after(t2, lambda: print('\tRejecting') or rejectFunc(TimeoutError("Timed out")))

    promise = Promise(executor_)
    new_promise = promise.then()
    # attached = Promise(lambda resolveFunc, rejectFunc: None)
    attached = Promise(lambda resolveFunc, rejectFunc: resolveFunc(new_promise))
    attached.referenceTo(new_promise)
    p1 = Promise.resolve(1).then(2).then()
    p2 = Promise.reject(1).then(2, 2).then().then()
    p3 = p1.then()

    print(promise)
    print(new_promise)
    print(attached)
    print(p1)
    print(p2)
    print(p3)

    p_all = Promise.all([p1, p3, new_promise, promise])
    print('all', p_all)
    p_settled = Promise.allSettled([p1, p2, p3, new_promise, promise])
    print('allSettled', p_settled)

    print('\nFinished\n')
    hub.sleep(1)

    while True:
        hub.sleep(0)
        try:
            print()
            print(promise)
            print(new_promise)
            print(attached)
            print(attached.then())
            print(p1)
            print(p2)
            print(p3)
            print('all', p_all)
            print('allSettled', p_settled)
            hub.sleep(3)
        except KeyboardInterrupt:
            sys.exit(0)

Upvotes: 0

Max
Max

Reputation: 437

You can create your own Class Promise, i'm not a python dev but i tried to create something similar to javascript.

class Promise:
    def __init__(self, callback):
        self.resolved = ''
        self.rejected = ''
        callback(self.resolve, self.reject)

    def resolve(self, value):
        self.resolved = value

    def reject(self, value):
        self.rejected = value

    def then(self, callback):
        if not self.rejected:
            self.resolved = callback(self.resolved)
        return self

    def catch(self, callback):
        if self.rejected:
            self.rejected = callback(self.rejected)
        return self


def myPromise(resolve, reject):
    resolve(['Ana', 'Bia', 'Carlos', 'Daniel'])


def firstResolve(value):
    return value[0]


def secondResolve(value):
    print(value)


def firstReject(value):
    print('error:', value)


p = Promise(myPromise)
p.then(firstResolve).then(secondResolve).catch(firstReject)

Promise.all example

class Promise:
    def __init__(self, callback):
        self.resolved = ''
        self.rejected = ''
        if callable(callback):
            callback(self.resolve, self.reject)

    def resolve(self, value):
        self.resolved = value

    def reject(self, value):
        self.rejected = value

    def then(self, callback):
        if not self.rejected:
            self.resolved = callback(self.resolved)
        return self

    def catch(self, callback):
        if self.rejected:
            self.rejected = callback(self.rejected)
        return self

    def all(self, promises):
        resolvedArray = []
        rejectedArray = []
        for promise in promises:
            promise(self.resolve, self.reject)
            if self.resolved:
                resolvedArray += self.resolved
            if self.rejected:
                rejectedArray += self.rejected
                break
        self.resolved = resolvedArray
        self.rejected = rejectedArray
        return self


def myPromise1(resolve, reject):
    resolve(['Ana'])


def myPromise2(resolve, reject):
    resolve(['Bia'])


def myPromise3(resolve, reject):
    resolve(['Carlos'])


def myPromise4(resolve, reject):
    resolve(['Daniel'])


def allResolve(values):
    print('without error: ', values)


def allReject(values):
    print('with error: ', values)


p = Promise([])
p.all([myPromise1, myPromise2]).then(allResolve).catch(allReject)

Upvotes: -1

Wolph
Wolph

Reputation: 80081

You're in luck, Python 3.4 and above include asyncio, although the feature you are looking for (Future) is available in Python 3.5 and up.

From your own link about asyncio: "This version is only relevant for Python 3.3, which does not include asyncio in its stdlib."

Example:

import asyncio


async def some_coroutine():
    await asyncio.sleep(1)
    return 'done'


def process_result(future):
    print('Task returned:', future.result())


loop = asyncio.get_event_loop()
task = loop.create_task(some_coroutine())
task.add_done_callback(process_result)
loop.run_until_complete()

Upvotes: 9

Related Questions