示例(问题如下):
import asyncio
import typing as t
from aiokafka import AIOKafkaConsumer
class KafkaSimpleClient:
...
async def receive(self, topic: str) -> ???:
bootstrap_servers = ','.join(
'{}:{}'.format(host, port)
for host, port in self._bootstrap_servers
)
consumer = AIOKafkaConsumer(
loop=asyncio.get_event_loop(),
bootstrap_servers=bootstrap_servers,
metadata_max_age_ms=5000,
)
consumer.subscribe(pattern=topic)
await consumer.start()
return consumer
现在,我正在努力解决
receive
的返回类型(它返回可以用 async for x in y
迭代的东西。它是什么?它是一个可等待的迭代器吗?它是一个可等待的迭代器吗?也许完全是其他东西?
??? = t.Awaitable[t.Iterator]
??? = t.Iterator[t.Awaitable]
??? = (Something else)
从 Python 3.9 开始,
typing.AsyncIterable
和 typing.AsyncIterator
已弃用其抽象基类 (ABC) 中 collections.abc
对应项的别名。 ABC 还可以用可迭代项的类型作为下标,这样可以在迭代器迭代时推断出这些项的类型。
来自 文档 的
typing.AsyncIterable
:
现在支持collections.abc.AsyncIterable
。[]
因此,使用新的 ABC 类型和下标,
receive
的返回类型变为:
import collections.abc as abc
from aiokafka.structs import ConsumerRecord
async def receive(self, topic: str) -> abc.AsyncIterable[ConsumerRecord]:
...
# or
async def receive(self, topic: str) -> abc.AsyncIterator[ConsumerRecord]:
...
(其中
ConsumerRecord
是由 AIOKafkaConsumer
迭代器提供的项目类型,如文档所述。)
但是,AIOKafkaConsumer
的相同
文档告诉我们:
消费者使用后
无法调用AIOKafkaConsumer.stop()
将使后台任务继续运行。
因此,在这个具体示例中,
receive
的返回类型可能不应该抽象具体类型,因为使用者必须意识到返回值是必须清理的资源,而不仅仅是任何迭代器。