异步迭代器是什么类型?

问题描述 投票:0回答:2

示例(问题如下):

import asyncio
import typing as t

from aiokafka import AIOKafkaConsumer


class KafkaSimpleClient:

    ...

    async def receive(self, topic: str) -> ???:
        bootstrap_servers = ','.join(
            '{}:{}'.format(host, port)
            for host, port in self._bootstrap_servers
        )
        consumer = AIOKafkaConsumer(
            loop=asyncio.get_event_loop(),
            bootstrap_servers=bootstrap_servers,
            metadata_max_age_ms=5000,
        )
        consumer.subscribe(pattern=topic)
        await consumer.start()
        return consumer

现在,我正在努力解决

receive
的返回类型(它返回可以用
async for x in y
迭代的东西。它是什么?它是一个可等待的迭代器吗?它是一个可等待的迭代器吗?也许完全是其他东西?

  • ??? = t.Awaitable[t.Iterator]
  • ??? = t.Iterator[t.Awaitable]
  • ??? = (Something else)
python python-asyncio typing
2个回答
2
投票

typing

模块的源代码
毫无疑问。

async def receive(self, topic: str) -> t.AsyncIterable:

async def receive(self, topic: str) -> t.AsyncIterator:

如果你确定它将是严格的迭代器。


0
投票

从 Python 3.9 开始,

typing.AsyncIterable
typing.AsyncIterator
已弃用其抽象基类 (ABC) 中
collections.abc
对应项的别名。 ABC 还可以用可迭代项的类型作为下标,这样可以在迭代器迭代时推断出这些项的类型。

来自 文档

typing.AsyncIterable

自 3.9 版本起已弃用:

collections.abc.AsyncIterable
现在支持
[]

因此,使用新的 ABC 类型和下标,

receive
的返回类型变为:

import collections.abc as abc
from aiokafka.structs import ConsumerRecord

async def receive(self, topic: str) -> abc.AsyncIterable[ConsumerRecord]:
    ...

# or

async def receive(self, topic: str) -> abc.AsyncIterator[ConsumerRecord]:
    ...

(其中

ConsumerRecord
是由
AIOKafkaConsumer
迭代器提供的项目类型,如文档所述。)


但是,AIOKafkaConsumer的相同

文档
告诉我们:

消费者使用后

无法调用
AIOKafkaConsumer.stop()将使后台任务继续运行

因此,在这个具体示例中,

receive
的返回类型可能不应该抽象具体类型,因为使用者必须意识到返回值是必须清理的资源,而不仅仅是任何迭代器。

© www.soinside.com 2019 - 2024. All rights reserved.