为什么会出现这种僵局?
#!/usr/bin/env python3
import asyncio
from typing import Callable
async def _read_stream(stream: asyncio.StreamReader, callback: Callable[[bytes], None]):
while True:
line = await stream.readline()
if len(line) == 0:
break
callback(line)
async def _stream_subprocess(command: "list[str]"):
proc = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
# Default buffer size, and hence line length limit is a measly 64kB.
# Hopefully we won't have 1MB lines.
limit=1024 * 1024,
)
output: "list[bytes]" = []
assert proc.stdout is not None
assert proc.stderr is not None
await asyncio.gather(
_read_stream(
proc.stdout,
lambda line: output.append(line),
),
_read_stream(
proc.stderr,
lambda line: output.append(line),
),
)
await proc.wait()
async def run(command: "list[str]"):
print("Running")
result = await _stream_subprocess(command)
print("Throwing")
raise RuntimeError("failed")
if result.exit_code != 0:
CalledProcessError(result.exit_code, command, b"".join(result.output))
return result
async def run_with_parallelism_limit(command: "list[str]", limit: asyncio.Semaphore):
async with limit:
print("RUN")
await run(command)
print("OK")
async def run_commands(commands: "list[list[str]]", num_jobs: int):
sem = asyncio.Semaphore(num_jobs)
tasks = [run_with_parallelism_limit(command, sem) for command in commands]
await asyncio.gather(*tasks)
async def main():
commands = [["ls"], ["ls"]]
await run_commands(commands, 1)
if __name__ == "__main__":
asyncio.run(main())
输出:
RUN
Running
Throwing
RUN
Running
有 this Question 可能相关,但很难说,因为它没有解决并且代码不同。
将
return_exceptions=True
置于 asyncio.gather()
:
...
async def run_commands(commands: "list[list[str]]", num_jobs: int):
sem = asyncio.Semaphore(num_jobs)
tasks = [run_with_parallelism_limit(command, sem) for command in commands]
await asyncio.gather(*tasks, return_exceptions=True) # <-- here!
...
那么结果就是:
RUN
Running
Throwing
RUN
Running
Throwing