我正在使用 asyncio 异步启动进程,然后从标准输出读取数据,我的代码如下所示:
process = await asyncio.subprocess.create_subprocess_exe(subprocess_args, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
然后,我从该进程的标准输出中异步读取,对于读取的每一行,我调用用户提供的回调函数,我的代码如下所示:
async for line in process.stdout:
user_provided_callback(line)
然后等待进程终止:
_, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(stderr.decode())
但是,我注意到 user_provided_callback 引发了一个错误,该进程没有正确处理,所以我看到诸如“资源警告:未关闭的传输”之类的警告,因为我通过 pytest 单元测试运行此代码,警告被视为错误所以这导致了失败。
我认为在失败的情况下我必须手动关闭进程,所以我将代码更改为:
async for line in process.stdout:
try:
user_provided_callback(line)
except Exception as err:
process.terminate()
raise err
这会在回调失败时显式终止进程,但我仍然看到此警告。然后我将上面的代码修改为:
async for line in process.stdout:
try:
user_provided_callback(line)
except Exception as err:
process.terminate()
process._transport.close() ## <- NEWLY ADDED
raise err
显式关闭传输后,现在警告消失了,但是这会访问私有变量,因此它似乎不是正确的修复方法。在这种情况下处理异常的正确方法是什么?我正在寻找跨平台解决方案,因为我想支持 Windows 和 Linux。我也在使用 python 3.11.
process.terminate()
发送进程异步处理的信号;因此,当调用终止时它不会立即关闭,而是稍后关闭。
这意味着在调用
await process.wait()
以允许进行清理后,您仍然需要 await process.communicate()
或 process.terminate()
(这可能会更好,原因我将在下面描述)。
在最简单的情况下,只需添加
await process.wait()
就足够了,但存在潜在的复杂性:
terminate()
调用之后,您也需要继续读取输出,无论是在您自己的代码中还是使用 await process.communicate()
。