任务完成后是否可以立即中断AsyncIO事件循环?

问题描述 投票:0回答:1

看一下下面的简单程序:

import asyncio

def callback(_):
    loop = asyncio.get_running_loop()
    loop.stop()

async def echo(msg):
    print(msg)

async def main():
    loop = asyncio.get_running_loop()
    
    taskA = loop.create_task(echo("TaskA"))
    taskA.add_done_callback(callback)
    
    taskB = loop.create_task(echo("TaskB"))
    await taskB


asyncio.run(main())

目前输出

TaskA
TaskB

对我来说,这意味着调用

loop.stop()
的回调将使用语义上类似于
loop.call_soon(callback(taskA))
的内容安排在事件循环上,而不是在
TaskA
完成后立即运行。

是否有一种不同的机制可以在任务完成后立即中断事件循环,而无需在其他地方显式等待该任务?或者,有没有办法挂钩事件循环并在每次并发运行的任务之间发生变化时运行一些逻辑?

我的目标是创建一个调试机制,该机制将监视任务并在引发未捕获的异常时立即中断事件循环。我希望这种情况立即发生,而不是在我

await tracked_task
时发生,是因为我想保留任务的当前共享状态,以便我可以在调试器中检查它。

python python-asyncio event-loop
1个回答
0
投票

您可以重新定义

asyncio.Task
工厂,并取消所有其他任务并停止事件循环(如果任何任务完成时出现未处理的异常):

import asyncio
from time import monotonic
from typing import Any
import traceback


async def echo(msg: Any, s: Any) -> None:
    """Some random"""
    await asyncio.sleep(s)
    if s in (2, 5):
        raise Exception("Some sudden exception!")
    else:
        print(msg)


async def amain() -> None:
    """Main entrypoint of the app."""
    [asyncio.create_task(echo(t, t)) for t in range(10)]


class MyTask(asyncio.Task):
    """Create our own version of asyncio task."""
    def __init__(self, coro, *, loop, name=None) -> None:
        super().__init__(coro, loop=loop, name=name)
        self.add_done_callback(self.callback)  # add callback to each new task

    @staticmethod
    def callback(fut: "MyTask") -> None:
        """Stops asyncio loop if any task finished with error."""
        if ex := fut.exception():
            try:
                # remove callback from all other tasks
                for task in asyncio.all_tasks():
                    task.remove_done_callback(MyTask.callback)

                MyTask.stop_all()  # gracefully stop all other tasks

                print(f"Exception test: {ex}")
                trace = ''.join(traceback.format_exception(None, ex, ex.__traceback__))
                print(f"Full traceback: {trace}")
                raise ex  # reraise Exception
            finally:
                loop = asyncio.get_event_loop()
                loop.stop()

    @staticmethod
    def stop_all() -> None:
        """Gracefully cancels all tasks except the current one."""
        cur_task = asyncio.current_task()
        for task in asyncio.all_tasks():
            if task is not cur_task:
                try:
                    if not task.done():
                        task.cancel()
                except asyncio.CancelledError:
                    pass


def task_factory(loop, coro) -> MyTask:
    """Change default asyncio.Task factory."""
    return MyTask(loop=loop, coro=coro)


if __name__ == '__main__':
    start = monotonic()
    _loop = asyncio.get_event_loop()
    _loop.set_task_factory(task_factory)
    main_entry_task = _loop.create_task(amain())
    _loop.run_forever()  # loop will be stopped if any exception happened
    print(monotonic() - start)

© www.soinside.com 2019 - 2024. All rights reserved.