我正在使用 confluence-kafka-client。我有一个生产者生产一个主题,其中一个分区和一个组 ID 内有一个消费者。首先,我为该主题创建一个生产者(使用默认配置)(如果该主题不存在,我会使用该名称创建一个生产者)
self.producer = confluent_kafka.Producer({"bootstrap.servers": bootstrap_servers})
然后,我创建一个消费者并将其订阅到该主题(使用默认配置,
auto.offset.reset="latest"
)
self.consumer = confluent_kafka.Consumer(
{"bootstrap.servers": self.bootstrap_servers,
"group.id": self.group_id},
logger=logger,
)
self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
self.consumer.poll(0) # first call
我意识到
self.consumer.poll(0)
没有将该消费者注册到该主题,因为还没有关于该主题的数据。之后,制作人制作一张唱片。然后,我打电话
consumer.poll(0) # second call
期待获取数据。但是,它会返回 None
。事实上,数据产生后,poll(0)
的调用就注册了消费者。第三次打电话就可以获取数据了。
如果某个主题还没有数据,如何将消费者注册到该主题?
我不熟悉PythonKafkaApi,所以我会回答“在java中它是这样工作的”。
你创建了一个消费者 您订阅了一个主题 您尝试从订阅的主题获取消息
你的消费者创作看起来不错 订阅看起来也不错 这里的轮询不是无限循环并等待 0ms,所以,恕我直言,它没有足够的时间来完成“分区分配”,消费者将从集群知识中消失
poll(...)
应该处于无限循环中,您可能希望将 0
增加到 1000
。
为什么是无限循环:因为如果没有,在轮询之后,消费者停止与集群通信,那么集群会认为你的消费者死亡(经过一小段时间,在消费者属性中定义)
为什么
1000
:因为如果你想阅读一些消息,你想要等待超过0ms
(这个值是任意的,你可以更高或更低,但不能太低)
请注意,“消费者第一次尝试使用某些消息”时,可能需要 3 秒到 10 秒(取决于您的集群大小、内存、CPU 等)
如果您保留
auto.offset.reset="latest"
,则必须在之前启动您的消费者。
如果某个主题还没有数据,如何将消费者注册到该主题?
即使没有数据,您也可以将消费者注册到主题,但它必须“保持活动状态”(即:无限循环)。