只要剩下未完成的取消屏蔽任务但不再存在,如何运行asyncio循环?

问题描述 投票:1回答:2

我正在尝试向现有的asyncio循环中添加一些代码,以在Ctrl-C上提供干净的关机功能。下面是它正在做的事情的抽象。

import asyncio, signal

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, task.cancel)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

#def main():
#    try:
#        loop = asyncio.get_event_loop()
#        loop.create_task(aiomain())
#        loop.run_forever()
#    except asyncio.CancelledError:
#        pass

if __name__ == '__main__':
    main()

在此示例中,假设task1task2的序列一旦开始就需要完成,否则某些工件将处于不一致的状态。 (因此,asyncio.shield包装器会调用tasks。)

使用上述代码,如果我在脚本启动后立即中断脚本,并且只打印了Starting simulated task1,则循环停止,并且task2从未启动。如果我尝试切换到已注释掉的main版本,则即使循环已正确取消并且至少在几分钟内没有任何进一步反应,也永远不会退出。它确实有一点进步,因为它至少完成了task1task2的任何进行中的序列。

通过集思广益,一些可能的解决方案,尽管我仍然感觉到我必须缺少一些简单的东西:

  • asyncio.shield周围创建包装器,该包装器增加由asyncio.Condition对象同步的变量,运行被屏蔽的函数,然后递减该变量。然后,在aiomain处理程序中的CancelledError中,等待变量达到零,然后引发异常。 (在一个实现中,我可能会使用__aexit__将所有这些部分组合成一个类,并在CancelledError逻辑上实现等待零。)
  • 完全跳过使用asyncio的取消机制,而是使用asyncio.Event或类似名称来允许中断点或可中断的睡眠。虽然这似乎更具侵入性,但要求我指定哪些点被认为是可中断的,而不是声明需要屏蔽哪些序列以免被抵消。
python python-asyncio
2个回答
1
投票

这是一个很好的问题。我在制定答案时学到了一些东西,所以希望您仍在监视此线程。

首先要研究的是shield()方法如何工作?关于这一点,文档至少可以说令人困惑。在阅读test_tasks.py中的标准库测试代码之前,我无法弄清楚。这是我的理解:

考虑此代码片段:

async def coro_a():
    await asyncio.sheild(task_b())
    ...
task_a = asyncio.create_task(coro_a())
task_a.cancel()

当执行task_a.cancel()语句时,task_a确实被取消。等待语句将引发CancelledError 立即,而无需等待task_b完成。但是task_b继续运行。外部任务(a)停止,但内部任务(b)没有。

这里是您程序的修改版,对此进行了说明。主要更改是在CancelledError异常处理程序中插入等待,以使程序保持活动状态几秒钟。我在Windows上运行,这就是为什么我也稍微更改了信号处理程序的原因,但这只是一个小问题。我还在打印语句中添加了时间戳。

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    task = asyncio.create_task(task_loop())
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await asyncio.sleep(5.0)
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Output when ctrlC is struck during task1
# 
# Starting simulated task1 1590871747.8977509
# Got Control-C 1590871750.8385916
# True
# Shutting down task loop 1590871750.8425908
# Caught CancelledError 1590871750.8435903
# Finished simulated task1 1590871752.908434
# Starting simulated task2 1590871752.908434
# Program exit, cancelled 1590871755.8488846        

if __name__ == '__main__':
    main()

您可以看到您的程序无法正常工作,因为在task1和task2有机会完成之前,task_loop被取消后就退出了。他们一直都在那儿(或者,如果程序继续运行的话,他们会一直在那儿)。

这说明了shield()和cancel()如何相互作用,但实际上并不能解决您提出的问题。为此,我认为,您需要有一个可等待的对象,您可以使用该对象来使程序保持活动状态,直到完成重要任务为止。需要在顶层创建该对象,并将其向下传递到执行重要任务的位置。这是一个与您的程序相似的程序,但是可以按照您想要的方式执行。

我进行了三轮运行:(1)任务1期间的Control-C,(2)任务2期间的Control-C,(3)两个任务都完成后的Control-C。在前两种情况下,程序继续进行,直到task2完成。在第三种情况下,它立即结束。

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks(kwrap):
    fut = asyncio.get_running_loop().create_future()
    kwrap.awaitable = fut
    await task1()
    await task2()
    fut.set_result(1)

async def task_loop(kwrap):
    try:
        while True:
            await asyncio.shield(tasks(kwrap))
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    kwrap = KillWrapper()
    task = asyncio.create_task(task_loop(kwrap))
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await kwrap.awaitable
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

class KillWrapper:
    def __init__(self):
        self.awaitable = asyncio.get_running_loop().create_future()
        self.awaitable.set_result(0)

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Run 1 Control-C during task1
# Starting simulated task1 1590872408.6737766
# Got Control-C 1590872410.7344952
# True
# Shutting down task loop 1590872410.7354996
# Caught CancelledError 1590872410.7354996
# Finished simulated task1 1590872413.6747622
# Starting simulated task2 1590872413.6747622
# Finished simulated task2 1590872418.6750958
# Program exit, cancelled 1590872418.6750958
#
# Run 1 Control-C during task2
# Starting simulated task1 1590872492.927735
# Finished simulated task1 1590872497.9280624
# Starting simulated task2 1590872497.9280624
# Got Control-C 1590872499.5973852
# True
# Shutting down task loop 1590872499.5983844
# Caught CancelledError 1590872499.5983844
# Finished simulated task2 1590872502.9274273
# Program exit, cancelled 1590872502.9287038
#
# Run 1 Control-C after task2 -> immediate exit
# Starting simulated task1 1590873694.2925708
# Finished simulated task1 1590873699.2928336
# Starting simulated task2 1590873699.2928336
# Finished simulated task2 1590873704.2938952
# Got Control-C 1590873706.0790765
# True
# Shutting down task loop 1590873706.0804725
# Caught CancelledError 1590873706.0804725
# Program exit, cancelled 1590873706.0814824

0
投票

这是我最终使用的内容:

import asyncio, signal

async def _shield_and_wait_body(coro, finish_event):
    try:
        await coro
    finally:
        finish_event.set()

async def shield_and_wait(coro):
    finish_event = asyncio.Event()
    task = asyncio.shield(_shield_and_wait_body(coro, finish_event))
    try:
        await task
    except asyncio.CancelledError:
        await finish_event.wait()
        raise

def shield_and_wait_decorator(coro_fn):
    return lambda *args, **kwargs: shield_and_wait(coro_fn(*args, **kwargs))

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

@shield_and_wait_decorator
async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            # Alternative to applying @shield_and_wait_decorator to tasks()
            #await shield_and_wait(tasks())
            await tasks()
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

def sigint_handler(task):
    print("Cancelling task loop")
    task.cancel()

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, sigint_handler, task)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    main()

类似于Paul Cornelius的回答,这在允许CancelledError沿调用链向上传播之前插入了一个等待子任务完成的等待。但是,除了要调用asyncio.shield时,不需要触摸代码。

((在我的实际用例中,我同时运行三个循环,使用asyncio.Lock确保一个任务或一系列任务在另一个任务开始之前完成。我在该锁上也有一个asyncio.Condition与一个协程进行通信当我尝试在aiomainmain中等待所有被屏蔽的任务完成时,我遇到了一个问题,即被取消的父级释放了锁,然后一个被屏蔽的任务试图使用将该锁获取并释放到受屏蔽的任务中也没有任何意义-这将导致任务B仍按顺序运行:受屏蔽的任务A启动,任务B的协程超时了另一方面,通过将等待放置在shield_and_wait调用的位置,可以巧妙地避免过早地释放了锁定。)

一个警告:shield_and_wait_decorator在类方法上似乎无法正常工作。

© www.soinside.com 2019 - 2024. All rights reserved.