如何在 Python 中迭代 AsyncIterator 流并设置超时而不取消流?

问题描述 投票:0回答:1

我正在处理一个

AsyncIterator[str]
的对象。 它从网络获取消息,并将它们作为字符串生成。 我想为此流创建一个包装器来缓冲这些消息,并定期生成它们。

我的代码如下所示:

async def buffer_stream(stream: AsyncIterator[str], buffer_time: Optional[float]) -> AsyncIterator[str]:
    """
    Buffer messages from the stream, and yields them at regular intervals.
    """
    last_sent_at = time.perf_counter()
    buffer = ''

    stop = False
    while not stop:
        time_to_send = False

        timeout = (
            max(buffer_time - (time.perf_counter() - last_sent_at), 0)
            if buffer_time else None
        )
        try:
            buffer += await asyncio.wait_for(
                stream.__anext__(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            time_to_send = True
        except StopAsyncIteration:
            time_to_send = True
            stop = True
        else:
            if time.perf_counter() - last_sent_at >= buffer_time:
                time_to_send = True

        if not buffer_time or time_to_send:
            if buffer:
                yield buffer
                buffer = ''
            last_sent_at = time.perf_counter()

据我所知,逻辑是有道理的,但是一旦遇到第一个超时,它就会中断流,并在流完成之前提前退出。

我想这可能是因为

asyncio.wait_for
特别说:

发生超时时,它会取消任务并引发 TimeoutError。为了避免任务取消,请将其扭曲为

shield()

我尝试用盾牌包裹它:

buffer += await asyncio.wait_for(
    shield(stream.__anext__()),
    timeout=timeout
)

此错误出于不同的原因:

RuntimeError: anext(): asynchronous generator is already running
。据我了解,这意味着当它尝试获取下一个时,它仍在获取上一个
anext()
,这会导致错误。

有正确的方法吗?

演示:https://www.sololearn.com/en/compiler-playground/cBCVnVAD4H7g

python asynchronous python-asyncio
1个回答
0
投票

您可以将

stream.__anext__()
的结果转换为任务(或者更一般地说,未来)并等待它,直到它超时或产生结果:

async def buffer_stream(stream: AsyncIterator[str], buffer_time: Optional[float]) -> AsyncIterator[str]:
    last_sent_at = time.perf_counter()
    buffer = ''

    stop = False
    await_next = None
    while not stop:
        time_to_send = False

        timeout = (
            max(buffer_time - (time.perf_counter() - last_sent_at), 0)
            if buffer_time else None
        )
        if await_next is None:
            await_next = asyncio.ensure_future(stream.__anext__())
        try:
            buffer += await asyncio.wait_for(
                asyncio.shield(await_next),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            time_to_send = True
        except StopAsyncIteration:
            time_to_send = True
            stop = True
        else:
            await_next = None
            if time.perf_counter() - last_sent_at >= buffer_time:
                time_to_send = True

        if not buffer_time or time_to_send:
            if buffer:
                yield buffer
                buffer = ''
            last_sent_at = time.perf_counter()
© www.soinside.com 2019 - 2024. All rights reserved.