Kafka Consumer 第一次 poll(0) 没有返回数据

问题描述 投票:0回答:1

我正在使用 confluence-kafka-client。我有一个生产者生产一个主题,其中一个分区和一个组 ID 内有一个消费者。首先,我为该主题创建一个生产者(使用默认配置)(如果该主题不存在,我会使用该名称创建一个生产者)

self.producer = confluent_kafka.Producer({"bootstrap.servers": bootstrap_servers})

然后,我创建一个消费者并将其订阅到该主题(使用默认配置,

auto.offset.reset="latest"

self.consumer = confluent_kafka.Consumer(
            {"bootstrap.servers": self.bootstrap_servers,
                "group.id": self.group_id},
            logger=logger,
        )
self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
self.consumer.poll(0) # first call

我意识到

self.consumer.poll(0)
没有将该消费者注册到该主题,因为还没有关于该主题的数据。之后,制作人制作一张唱片。然后,我打电话
consumer.poll(0) # second call
期待获取数据。但是,它会返回
None
。事实上,数据产生后,
poll(0)
的调用就注册了消费者。第三次打电话就可以获取数据了。 如果某个主题还没有数据,如何将消费者注册到该主题?

参考:Kafka Consumer.poll 没有返回任何记录

python apache-kafka kafka-consumer-api confluent-kafka-python
1个回答
0
投票

我不熟悉PythonKafkaApi,所以我会回答“在java中它是这样工作的”。

  1. 我会解释你的代码
  2. 我会解释为什么它不起作用
  3. 我会解释我认为你应该修改的内容

第一

你创建了一个消费者 您订阅了一个主题 您尝试从订阅的主题获取消息

第二

你的消费者创作看起来不错 订阅看起来也不错 这里的轮询不是无限循环并等待 0ms,所以,恕我直言,它没有足够的时间来完成“分区分配”,消费者将从集群知识中消失

第三

poll(...)
应该处于无限循环中,您可能希望将
0
增加到
1000

为什么是无限循环:因为如果没有,在轮询之后,消费者停止与集群通信,那么集群会认为你的消费者死亡(经过一小段时间,在消费者属性中定义)

为什么

1000
:因为如果你想阅读一些消息,你想要等待超过
0ms
(这个值是任意的,你可以更高或更低,但不能太低)

请注意,“消费者第一次尝试使用某些消息”时,可能需要 3 秒到 10 秒(取决于您的集群大小、内存、CPU 等)

如果您保留

auto.offset.reset="latest"
,则必须在之前启动您的消费者。

如果某个主题还没有数据,如何将消费者注册到该主题?

即使没有数据,您也可以将消费者注册到主题,但它必须“保持活动状态”(即:无限循环)。

© www.soinside.com 2019 - 2024. All rights reserved.