Python 异步进程和信号量的死锁

问题描述 投票:0回答:1

为什么会出现这种僵局?

#!/usr/bin/env python3

import asyncio
from typing import Callable


async def _read_stream(stream: asyncio.StreamReader, callback: Callable[[bytes], None]):
    while True:
        line = await stream.readline()
        if len(line) == 0:
            break
        callback(line)


async def _stream_subprocess(command: "list[str]"):

    proc = await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        # Default buffer size, and hence line length limit is a measly 64kB.
        # Hopefully we won't have 1MB lines.
        limit=1024 * 1024,
    )

    output: "list[bytes]" = []

    assert proc.stdout is not None
    assert proc.stderr is not None

    await asyncio.gather(
        _read_stream(
            proc.stdout,
            lambda line: output.append(line),
        ),
        _read_stream(
            proc.stderr,
            lambda line: output.append(line),
        ),
    )

    await proc.wait()


async def run(command: "list[str]"):
    print("Running")
    result = await _stream_subprocess(command)
    print("Throwing")

    raise RuntimeError("failed")

    if result.exit_code != 0:
        CalledProcessError(result.exit_code, command, b"".join(result.output))

    return result


async def run_with_parallelism_limit(command: "list[str]", limit: asyncio.Semaphore):
    async with limit:
        print("RUN")
        await run(command)
        print("OK")


async def run_commands(commands: "list[list[str]]", num_jobs: int):
    sem = asyncio.Semaphore(num_jobs)
    tasks = [run_with_parallelism_limit(command, sem) for command in commands]
    await asyncio.gather(*tasks)


async def main():
    commands = [["ls"], ["ls"]]
    await run_commands(commands, 1)


if __name__ == "__main__":
    asyncio.run(main())

输出:

RUN
Running
Throwing
RUN
Running

this Question 可能相关,但很难说,因为它没有解决并且代码不同。

python python-asyncio deadlock
1个回答
0
投票

return_exceptions=True
置于
asyncio.gather()
:


...

async def run_commands(commands: "list[list[str]]", num_jobs: int):
    sem = asyncio.Semaphore(num_jobs)
    tasks = [run_with_parallelism_limit(command, sem) for command in commands]
    await asyncio.gather(*tasks, return_exceptions=True)  # <-- here!

...

那么结果就是:

RUN
Running
Throwing
RUN
Running
Throwing
© www.soinside.com 2019 - 2024. All rights reserved.