当使用ASYNCIO,你是如何让所有正在运行的任务,关停事件循环之前完成

问题描述 投票:40回答:3

我有以下代码:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

我运行这个功能,直到完成。该函数完成任何未决的任务永远不会运行 - 当关机设置出现问题。 (你认为这是一个错误

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>

)。如何正确安排关机?

为了让一些背景,我写了一个系统监视器,其从/ proc / STAT读取每5秒,在计算该期间的CPU使用率,然后将结果发送到服务器。我想保持这些调度监视作业,直到我收到SIGTERM,当我停下来调度,等待所有当前作业完成,并退出优雅。

python python-3.4 python-asyncio
3个回答
42
投票

您可以检索未完成的任务,并再次运行循环,直到他们完成,然后关闭循环或退出程序。

pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
  • 悬而未决的是尚未完成的任务列表。
  • asyncio.gather()允许同时在多个任务等待。

如果你想确保所有的任务协同程序(也许你有一个“主”协程)内完成,你可以这样来做,比如:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*asyncio.Task.all_tasks())

此外,在这种情况下,所有的任务都在同一个协程创建的,你已经可以访问任务:

@asyncio.coroutine
def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.async(my_expensive_operation()))
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*tasks)

9
投票

对于Python 3.7以上答案使用多淘汰的API(asyncio.async和Task.all_tasks,@ asyncio.coroutine,从收益率,等等),你倒是应该这样做:

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        await asyncio.sleep(interval)


loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)

try:
    loop.run_until_complete(coro)
except KeyboardInterrupt:
    coro.close()
    tasks = asyncio.all_tasks(loop)
    expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
    loop.run_until_complete(asyncio.gather(*expensive_tasks))

1
投票

你也可以考虑使用asyncio.shield,虽然通过这样做,你不会得到完成,但只屏蔽所有正在运行的任务。但它仍然可以在某些情况下非常有用。

除此之外,像Python 3.7的,我们也可以在这里使用的高级API方法asynio.run。由于Python的核心开发者,尤里·Selivanov提示:https://youtu.be/ReXxO_azV-w?t=636 注:asyncio.run功能已经被添加在Python 3.7〜ASYNCIO在临时基础上。

希望帮助!

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        # using asyncio.shield
        await asyncio.shield(asyncio.sleep(interval))


coro = do_something_periodically(1, 1)

if __name__ == "__main__":
    try:
        # using asyncio.run
        asyncio.run(coro)
    except KeyboardInterrupt:
        print('Cancelled!')

1
投票

使用包装协同程序是等待,直到挂起任务计数返回之前为1。

async def loop_job():
    asyncio.create_task(do_something_periodically())
    while len(asyncio.Task.all_tasks()) > 1:  # Any task besides loop_job() itself?
        await asyncio.sleep(0.2)

asyncio.run(loop_job())

1
投票

我不知道这是否是你问什么,但我也有类似的问题,这里是我想出了终极解决方案。

该代码是蟒蛇3兼容,并且只使用公共的API ASYNCIO(意味着没有哈克_coro并没有弃用API)。

import asyncio

async def fn():
  await asyncio.sleep(1.5)
  print('fn')

async def main():
    print('main start')
    asyncio.create_task(fn()) # run in parallel
    await asyncio.sleep(0.2)
    print('main end')


def async_run_and_await_all_tasks(main):
  def get_pending_tasks():
      tasks = asyncio.Task.all_tasks()
      pending = [task for task in tasks if task != run_main_task and not task.done()]
      return pending

  async def run_main():
      await main()

      while True:
          pending_tasks = get_pending_tasks()
          if len(pending_tasks) == 0: return
          await asyncio.gather(*pending_tasks)

  loop = asyncio.new_event_loop()
  run_main_coro = run_main()
  run_main_task = loop.create_task(run_main_coro)
  loop.run_until_complete(run_main_task)

# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)

输出(如预期):

main start
main end
fn

这async_run_and_await_all_tasks功能将蟒蛇的方式的NodeJS行为:当没有未完成的任务出口仅。

© www.soinside.com 2019 - 2024. All rights reserved.