如何用 Python 编写一系列 Promise?

问题描述 投票:0回答:4

是否可以仅使用Python 3.6.1标准库编写一系列promise(或任务)?

例如,JavaScript 中的序列 Promise 写为:

const SLEEP_INTERVAL_IN_MILLISECONDS = 200;

const alpha = function alpha (number) {
    return new Promise(function (resolve, reject) {
        const fulfill = function() {
            return resolve(number + 1);
        };

        return setTimeout(fulfill, SLEEP_INTERVAL_IN_MILLISECONDS);
    });
};

const bravo = function bravo (number) {
    return new Promise(function (resolve, reject) {
        const fulfill = function() {
            return resolve(Math.ceil(1000*Math.random()) + number);
        };
        return setTimeout(fulfill, SLEEP_INTERVAL_IN_MILLISECONDS);
    });
};

const charlie = function charlie (number) {
    return new Promise(function (resolve, reject) {
        return (number%2 == 0) ? reject(number) : resolve(number);
    });
};

function run() {
    return Promise.resolve(42)
        .then(alpha)
        .then(bravo)
        .then(charlie)
        .then((number) => {
            console.log('success: ' + number)
        })
        .catch((error) => {
            console.log('error: ' + error);
        });
}

run();

每个函数还返回一个带有异步处理结果的Promise,该结果将由紧随其后的promise解决/拒绝。

我知道诸如

promises-2.01b
asyncio 3.4.3
之类的库,并且我正在寻找 Python STL 解决方案。因此,如果我需要导入非 STL 库,我更喜欢使用 RxPython 来代替。

javascript python python-3.x promise python-asyncio
4个回答
35
投票

这是一个使用 asyncio 和

async/await
语法的类似程序:

import asyncio
import random

async def alpha(x):
    await asyncio.sleep(0.2)
    return x + 1 

async def bravo(x):
    await asyncio.sleep(0.2)
    return random.randint(0, 1000) + x

async def charlie(x):
    if x % 2 == 0:
        return x
    raise ValueError(x, 'is odd')

async def run():
    try:
        number = await charlie(await bravo(await alpha(42)))
    except ValueError as exc:
        print('error:', exc.args[0])
    else:
        print('success:', number)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()

编辑:如果您对反应式流感兴趣,您可以考虑使用aiostream

这是一个简单的例子:

import asyncio
from aiostream import stream, pipe

async def main():
    # This stream computes 11² + 13² in 1.5 second
    xs = (
        stream.count(interval=0.1)      # Count from zero every 0.1 s
        | pipe.skip(10)                 # Skip the first 10 numbers
        | pipe.take(5)                  # Take the following 5
        | pipe.filter(lambda x: x % 2)  # Keep odd numbers
        | pipe.map(lambda x: x ** 2)    # Square the results
        | pipe.accumulate()             # Add the numbers together
    )
    print('11² + 13² = ', await xs)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

文档中有更多示例。

免责声明:我是项目维护者。


9
投票

你很幸运,Python 3.4 及更高版本包含

asyncio
,尽管你正在寻找的功能 (Future) 在 Python 3.5 及更高版本中可用。

来自您自己的有关

asyncio
的链接:“此版本仅与 Python 3.3 相关,其 stdlib 中不包含 asyncio。”

示例:

import asyncio


async def some_coroutine():
    await asyncio.sleep(1)
    return 'done'


def process_result(future):
    print('Task returned:', future.result())


loop = asyncio.get_event_loop()
task = loop.create_task(some_coroutine())
task.add_done_callback(process_result)
loop.run_until_complete()

0
投票

你可以创建自己的类 Promise,我不是 python 开发人员,但我尝试创建类似于 javascript 的东西。

class Promise:
    def __init__(self, callback):
        self.resolved = ''
        self.rejected = ''
        callback(self.resolve, self.reject)

    def resolve(self, value):
        self.resolved = value

    def reject(self, value):
        self.rejected = value

    def then(self, callback):
        if not self.rejected:
            self.resolved = callback(self.resolved)
        return self

    def catch(self, callback):
        if self.rejected:
            self.rejected = callback(self.rejected)
        return self


def myPromise(resolve, reject):
    resolve(['Ana', 'Bia', 'Carlos', 'Daniel'])


def firstResolve(value):
    return value[0]


def secondResolve(value):
    print(value)


def firstReject(value):
    print('error:', value)


p = Promise(myPromise)
p.then(firstResolve).then(secondResolve).catch(firstReject)

Promise.all 示例

class Promise:
    def __init__(self, callback):
        self.resolved = ''
        self.rejected = ''
        if callable(callback):
            callback(self.resolve, self.reject)

    def resolve(self, value):
        self.resolved = value

    def reject(self, value):
        self.rejected = value

    def then(self, callback):
        if not self.rejected:
            self.resolved = callback(self.resolved)
        return self

    def catch(self, callback):
        if self.rejected:
            self.rejected = callback(self.rejected)
        return self

    def all(self, promises):
        resolvedArray = []
        rejectedArray = []
        for promise in promises:
            promise(self.resolve, self.reject)
            if self.resolved:
                resolvedArray += self.resolved
            if self.rejected:
                rejectedArray += self.rejected
                break
        self.resolved = resolvedArray
        self.rejected = rejectedArray
        return self


def myPromise1(resolve, reject):
    resolve(['Ana'])


def myPromise2(resolve, reject):
    resolve(['Bia'])


def myPromise3(resolve, reject):
    resolve(['Carlos'])


def myPromise4(resolve, reject):
    resolve(['Daniel'])


def allResolve(values):
    print('without error: ', values)


def allReject(values):
    print('with error: ', values)


p = Promise([])
p.all([myPromise1, myPromise2]).then(allResolve).catch(allReject)

0
投票

对于某些应用程序,我需要将 Promises 与 Eventlet 一起使用。这就是我实现的(完整代码可在我的 eventlet-promise github 存储库中找到):

class Promise(Thenable):
    """
    A Promise represents the eventual result of an asynchronous operation.
    The primary way of interacting with a promise is through its then method,
    which registers callbacks to receive either a promise's eventual value or
    the reason why the promise cannot be fulfilled.

    A promise has an state, which can be either 'pending', 'fulfilled', or 'rejected'.

    A promise has three internal properties:
    - _fate is either 'resolved' (attached, fulfilled or rejected) or 'unresolved'.
    - _value is the result of the operation. Initially undefined.
    - _callbacks is a list of functions to call when the promise is resolved or rejected.

    A promise is in one of three different states:
    - pending: initial state, neither fulfilled nor rejected.
    - resolved: meaning that the operation completed successfully.
    - rejected: meaning that the operation failed.
    - attached: meaning that the promise has been attached to another promise.

    A pending promise can either be fulfilled with a value, or rejected with a
    reason (error). When either of these options happens, the associated
    handlers queued up by a promise's then method are called.

    The promise is said to be settled if it is either fulfilled or rejected,
    but not pending. Once settled, a promise can not be resettled.
    
    Arguments:
    - executor is a function with the signature executor(resolve, reject).
        - resolve is a function with the signature resolve(result).
        - reject is a function with the signature reject(reason).
        An `executor` call is expected to do one of the following:
        - Call resolveFunc(result) side-effect if it successfully completes.
        - Call rejectFunc(reason) side-effect if it fails to complete.
        - Register callbacks to be called when the promise is resolved or rejected.
    """
    def __init__(self, executor : Callable[[Callable[[Any], None], Callable[[Any], None]], None]):
        super().__init__()
        executor = executor or (lambda _, __: None)
        self.execute(executor)

    def __del__(self):
        for thread in self._threads:
            thread.kill()

    @staticmethod
    def resolve(value : Any) -> 'Promise':
        if isinstance(value, Promise):
            return value
        return Promise(lambda resolveFunc, _: resolveFunc(value))

    @staticmethod
    def reject(reason : Any):
        return Promise(lambda _, rejectFunc: rejectFunc(reason))

    @staticmethod
    def all(promises : List['Promise']):
        def executor(resolveFunc, rejectFunc):
            def chainExecute(promises : List['Promise'], results, resolveFunc, rejectFunc):
                assert promises, 'No promises to chain'
                promises = list(promises)
                promise_ : Promise = promises.pop(0)
                nextPromise : Promise = promises[0] if promises else None
                promise_.then(lambda x, promise_=promise_, nextPromise=nextPromise:
                    nextPromise.waitExecute(chainExecute,
                            promises, results + [x],
                            resolveFunc, rejectFunc
                    ) if nextPromise
                    else resolveFunc(results + [x])
                , rejectFunc)
            return hub.spawn(chainExecute, promises, [], resolveFunc, rejectFunc)
        return Promise(executor)

    @staticmethod
    def allSettled(promises : List['Promise']) -> 'Promise':
        if not promises:
            return Promise.resolve([])
        def executor(resolveFunc, rejectFunc):
            def chainExecute(promises : List['Promise'], results, resolveFunc, rejectFunc):
                assert promises, 'No promises to chain'
                promises = list(promises)
                promise_ : Promise = promises.pop(0)
                nextPromise : Promise = promises[0] if promises else None
                promise_.then(lambda x, promise_=promise_, nextPromise=nextPromise:
                    nextPromise.waitExecute(chainExecute,
                            promises, results + [{'status': promise_.getState(), 'value': x}],
                            resolveFunc, rejectFunc
                    ) if nextPromise
                    else resolveFunc(results + [{'status': promise_.getState(), 'value': x}])
                , lambda x, promise_=promise_, nextPromise=nextPromise:
                    nextPromise.waitExecute(chainExecute,
                            promises, results + [{'status': promise_.getState(), 'reason': x}],
                            resolveFunc, rejectFunc
                    ) if nextPromise
                    else resolveFunc(results + [{'status': promise_.getState(), 'reason': x}])
                )
            return hub.spawn(chainExecute, promises, [], resolveFunc, resolveFunc)
        return Promise(executor)

    def then(self, onFulfilled : Callable[[Any], Any] = None, onRejected : Callable[[Any], Any] = None):
        def raise_(reason):
            if isinstance(reason, Exception):
                raise reason
            raise Exception(reason)     # pylint: disable=broad-exception-raised
        onFulfilled = onFulfilled if callable(onFulfilled) else (lambda value: value)
        onRejected = onRejected if callable(onRejected) else raise_
        try:
            if self.isFulfilled():
                value = onFulfilled(self._value)
                promise_ = Promise(lambda resolveFunc, _: self.waitExecute(resolveFunc, value))
                # hub.sleep(0)
                return promise_
            if self.isRejected():
                value = onRejected(self._value)     # pylint: disable=assignment-from-no-return
                promise_ = Promise(lambda _, rejectFunc: self.waitExecute(rejectFunc, value))
                # hub.sleep(0)
                return promise_
            promise_ = Promise(lambda resolveFunc, _: resolveFunc(self))
            promise_.referenceTo(self, onFulfilled, onRejected)
            return promise_
        except Exception as error:          # pylint: disable=broad-except
            return Promise.reject(error)

    def catch(self, onRejected : Callable[[Any], Any] = None):
        return self.then(None, onRejected)

    def finally_(self, onFinally : Callable[[Any], Any] = None):
        return self.then(onFinally, onFinally)

if __name__ == '__main__':
    def executor_(resolveFunc : Callable[[Any], None], rejectFunc : Callable[[Any], None]):      # match, timeout
        t1, t2 = 5, 6
        # print(t := 1.5 * random())
        hub.spawn_after(t1, lambda: print('\tResolving') or resolveFunc(t1))
        hub.spawn_after(t2, lambda: print('\tRejecting') or rejectFunc(TimeoutError("Timed out")))

    promise = Promise(executor_)
    new_promise = promise.then()
    # attached = Promise(lambda resolveFunc, rejectFunc: None)
    attached = Promise(lambda resolveFunc, rejectFunc: resolveFunc(new_promise))
    attached.referenceTo(new_promise)
    p1 = Promise.resolve(1).then(2).then()
    p2 = Promise.reject(1).then(2, 2).then().then()
    p3 = p1.then()

    print(promise)
    print(new_promise)
    print(attached)
    print(p1)
    print(p2)
    print(p3)

    p_all = Promise.all([p1, p3, new_promise, promise])
    print('all', p_all)
    p_settled = Promise.allSettled([p1, p2, p3, new_promise, promise])
    print('allSettled', p_settled)

    print('\nFinished\n')
    hub.sleep(1)

    while True:
        hub.sleep(0)
        try:
            print()
            print(promise)
            print(new_promise)
            print(attached)
            print(attached.then())
            print(p1)
            print(p2)
            print(p3)
            print('all', p_all)
            print('allSettled', p_settled)
            hub.sleep(3)
        except KeyboardInterrupt:
            sys.exit(0)

© www.soinside.com 2019 - 2024. All rights reserved.