我正在处理一个
AsyncIterator[str]
的对象。
它从网络获取消息,并将它们作为字符串生成。
我想为此流创建一个包装器来缓冲这些消息,并定期生成它们。
我的代码如下所示:
async def buffer_stream(stream: AsyncIterator[str], buffer_time: Optional[float]) -> AsyncIterator[str]:
"""
Buffer messages from the stream, and yields them at regular intervals.
"""
last_sent_at = time.perf_counter()
buffer = ''
stop = False
while not stop:
time_to_send = False
timeout = (
max(buffer_time - (time.perf_counter() - last_sent_at), 0)
if buffer_time else None
)
try:
buffer += await asyncio.wait_for(
stream.__anext__(),
timeout=timeout
)
except asyncio.TimeoutError:
time_to_send = True
except StopAsyncIteration:
time_to_send = True
stop = True
else:
if time.perf_counter() - last_sent_at >= buffer_time:
time_to_send = True
if not buffer_time or time_to_send:
if buffer:
yield buffer
buffer = ''
last_sent_at = time.perf_counter()
据我所知,逻辑是有道理的,但是一旦遇到第一个超时,它就会中断流,并在流完成之前提前退出。
我想这可能是因为
asyncio.wait_for
特别说:
发生超时时,它会取消任务并引发 TimeoutError。为了避免任务取消,请将其扭曲为
。shield()
我尝试用盾牌包裹它:
buffer += await asyncio.wait_for(
shield(stream.__anext__()),
timeout=timeout
)
此错误出于不同的原因:
RuntimeError: anext(): asynchronous generator is already running
。据我了解,这意味着当它尝试获取下一个时,它仍在获取上一个anext()
,这会导致错误。
有正确的方法吗?
演示:https://www.sololearn.com/en/compiler-playground/cBCVnVAD4H7g
您可以将
stream.__anext__()
的结果转换为任务(或者更一般地说,未来)并等待它,直到它超时或产生结果:
async def buffer_stream(stream: AsyncIterator[str], buffer_time: Optional[float]) -> AsyncIterator[str]:
last_sent_at = time.perf_counter()
buffer = ''
stop = False
await_next = None
while not stop:
time_to_send = False
timeout = (
max(buffer_time - (time.perf_counter() - last_sent_at), 0)
if buffer_time else None
)
if await_next is None:
await_next = asyncio.ensure_future(stream.__anext__())
try:
buffer += await asyncio.wait_for(
asyncio.shield(await_next),
timeout=timeout
)
except asyncio.TimeoutError:
time_to_send = True
except StopAsyncIteration:
time_to_send = True
stop = True
else:
await_next = None
if time.perf_counter() - last_sent_at >= buffer_time:
time_to_send = True
if not buffer_time or time_to_send:
if buffer:
yield buffer
buffer = ''
last_sent_at = time.perf_counter()