如何使用高级异步/等待代码桥接基于低级回调的代码?

问题描述 投票:0回答:2

asyncio.Futures文件指出:

Future对象用于将基于低级回调的代码与高级异步/等待代码进行桥接。

有没有一个典型的例子说明如何做到这一点?


为了使示例更具体,假设我们要包装以下函数,这是基于回调的API的典型。要明确:此函数无法修改 - 假装它是一些复杂的第三方库(可能在内部使用我们无法控制的线程),它们需要回调。

import threading
import time

def callback_after_delay(secs, callback, *args):
    """Call callback(*args) after sleeping for secs seconds"""
    def _target():
        time.sleep(secs)
        callback(*args)

    thread = threading.Thread(target=_target)
    thread.start()

我们希望能够使用我们的包装函数,如:

async def main():
    await aio_callback_after_delay(10., print, "Hello, World")
python-3.x python-asyncio
2个回答
1
投票

只需使用ThreadPoolExecutor。除了如何启动线程之外,代码不会改变。如果从聚集调用中删除“return_exceptions”,您将看到打印完整回溯的异常,因此它取决于您所需的内容。

import time,random
from concurrent.futures import ThreadPoolExecutor
import asyncio

def cb():
  print("cb called")

def blocking():
  if random.randint(0,3) == 1:
    raise ValueError("Random Exception!")
  time.sleep(1)
  cb()
  return 5

async def run(loop):
  futs = []
  executor = ThreadPoolExecutor(max_workers=5)
  for x in range(5):
    future = loop.run_in_executor(executor, blocking)
    futs.append( future )

  res = await asyncio.gather( *futs, return_exceptions=True )
  for r in res:
    if isinstance(r, Exception):
      print("Exception:",r)

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

产量

cb called
cb called
cb called
Exception: Random Exception!
Exception: Random Exception!

0
投票

下面是一个完整的自包含示例,用于演示一种方法。它负责在asyncio线程上运行回调,并处理从回调引发的异常。

适用于python 3.6.6。我想知道在这里使用asyncio.get_event_loop()。我们需要一个循环,因为loop.create_future()是在asyncio中创建期货的首选方式。但是,在3.7中我们应该更喜欢asyncio.get_running_loop(),如果尚未设置循环,则会引发异常。也许最好的方法是将循环传递给aio_callback_after_delay显式 - 但这与现有的asyncio代码不匹配,这通常使循环成为可选的关键字参数。关于这一点的澄清,或任何其他改进将不胜感激!


import asyncio
import threading
import time


# This is the callback code we are trying to bridge

def callback_after_delay(secs, callback, *args):
    """Call callback(*args) after sleeping for secs seconds"""
    def _target():
        time.sleep(secs)
        callback(*args)

    thread = threading.Thread(target=_target)
    thread.start()


# This is our wrapper

async def aio_callback_after_delay(secs, callback, *args):
    loop = asyncio.get_event_loop()
    f = loop.create_future()

    def _inner():
        try:
            f.set_result(callback(*args))
        except Exception as ex:
            f.set_exception(ex)

    callback_after_delay(secs, loop.call_soon_threadsafe, _inner)
    return await f


#
# Below is test code to demonstrate things work
#

async def test_aio_callback_after_delay():
    print('Before!')
    await aio_callback_after_delay(1., print, "Hello, World!")
    print('After!')



async def test_aio_callback_after_delay_exception():

    def callback():
        raise RuntimeError()

    print('Before!')
    await aio_callback_after_delay(1., callback)
    print('After!')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Basic test
    print('Basic Test')
    loop.run_until_complete(test_aio_callback_after_delay())

    # Test our implementation is truly async
    print('Truly Async!')
    loop.run_until_complete(
        asyncio.gather(
            *(test_aio_callback_after_delay() for i in range(0,5))
        )
    )

    # Test exception handling
    print('Exception Handling')
    loop.run_until_complete(test_aio_callback_after_delay_exception())

输出类似于:

Basic Test
Before!
Hello, World
After!

Truly Async!
Before!
Before!
Before!
Before!
Before!
Hello, World
Hello, World
Hello, World
Hello, World
Hello, World
After!
After!
After!
After!
After!

Exception Handling
Before!
Traceback (most recent call last):
  File "./scratch.py", line 60, in <module>
    loop.run_until_complete(test_aio_callback_after_delay_exception())
  File "\lib\asyncio\base_events.py", line 468, in run_until_complete
    return future.result()
  File "./scratch.py", line 40, in test_aio_callback_after_delay_exception
    await aio_callback_after_delay(1., callback)
  File "./scratch.py", line 26, in aio_callback_after_delay
    return await f
  File "./scratch.py", line 21, in _inner
    f.set_result(callback(*args))
  File "./scratch.py", line 37, in callback
    raise RuntimeError()
RuntimeError
© www.soinside.com 2019 - 2024. All rights reserved.