如何在失败的情况下终止Python中的异步进程

问题描述 投票:0回答:1

我正在使用 asyncio 异步启动进程,然后从标准输出读取数据,我的代码如下所示:

process = await asyncio.subprocess.create_subprocess_exe(subprocess_args, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)

然后,我从该进程的标准输出中异步读取,对于读取的每一行,我调用用户提供的回调函数,我的代码如下所示:

async for line in process.stdout:
   user_provided_callback(line)

然后等待进程终止:

_, stderr = await process.communicate()
if process.returncode != 0:
   raise Exception(stderr.decode())

但是,我注意到 user_provided_callback 引发了一个错误,该进程没有正确处理,所以我看到诸如“资源警告:未关闭的传输”之类的警告,因为我通过 pytest 单元测试运行此代码,警告被视为错误所以这导致了失败。

我认为在失败的情况下我必须手动关闭进程,所以我将代码更改为:

    async for line in process.stdout:
       try:
         user_provided_callback(line)
       except Exception as err:
         process.terminate()
         raise err

这会在回调失败时显式终止进程,但我仍然看到此警告。然后我将上面的代码修改为:

async for line in process.stdout:
   try:
     user_provided_callback(line)
   except Exception as err:
     process.terminate()
     process._transport.close() ## <- NEWLY ADDED
     raise err

显式关闭传输后,现在警告消失了,但是这会访问私有变量,因此它似乎不是正确的修复方法。在这种情况下处理异常的正确方法是什么?我正在寻找跨平台解决方案,因为我想支持 Windows 和 Linux。我也在使用 python 3.11.

python python-asyncio
1个回答
0
投票

process.terminate()
发送进程异步处理的信号;因此,当调用终止时它不会立即关闭,而是稍后关闭。

这意味着在调用

await process.wait()
以允许进行清理后,您仍然需要
await process.communicate()
process.terminate()
(这可能会更好,原因我将在下面描述)。


在最简单的情况下,只需添加

await process.wait()
就足够了,但存在潜在的复杂性:

  • 如果进程在收到 SIGTERM 后尝试刷新 stdout 或 stderr,但由于 Python 代码中没有任何内容仍在从管道的另一端读取而无法执行此操作,则可能会导致死锁。为了避免这种情况,即使在
    terminate()
    调用之后,您也需要继续读取输出,无论是在您自己的代码中还是使用
    await process.communicate()
  • 如果进程定义了 SIGTERM 的处理程序,它可以选择忽略或延迟对该信号的处理。 如果您愿意接受一些数据丢失的风险(可能是因为进程修改的文件未完全刷新到磁盘或标准输出未完全写入),您可以等待合理的时间,然后发送如果进程仍未退出,则 SIGKILL;与 SIGTERM 不同,SIGKILL 不能被忽略。
© www.soinside.com 2019 - 2024. All rights reserved.