Kafka 消息轮询增加尝试

问题描述 投票:0回答:2

KAFKA Consumer当前设置:

 max_poll_records: 2000
 max_poll_interval_ms: 40000
 fetch_min_bytes: 104857600
 fetch_max_wait_ms: 10000
 fetch_max_bytes: 104857600
 request_timeout_ms: 300000
 max_partition_fetch_bytes: 104857600
 heartbeat_interval_ms: 60000
 session_timeout_ms: 180000

KAFKA Broker 的当前设置:

fetch.max.bytes: 104857600

通过这些设置,我可以一次轮询 2000 条消息,大小约为 84MB(一次轮询的大小)。 但我收到的第二条消息比以前少,而该消费者存在足够数量的滞后。

我只推送到一个分区,以便轮询也从一个分区进行。

如代码截图所示,轮询的消息具有以下模式: 第一次投票:2000 条消息 第二次民意调查:484 条消息 第三次民意调查:2000 条消息 第四次民意调查:484 条消息 第 5 次投票:2000 条消息 第 6 次民意调查:484 条消息 第 7 次民意调查:548 条消息 等等

谁能帮我解决这个问题,为什么会发生这种情况?

我尝试在谷歌上搜索解决方案,但没有找到太多

apache-kafka scaling
2个回答
1
投票

Kafka 消费者维护一个内部队列,一方面从代理获取数据,另一方面通过民意调查耗尽数据。在轮询 2000 条记录后,它几乎变成空的,Kafka 消费者开始获取丢失的记录。由于您将

fetch_min_bytes
设置为 100Mb:

  1. 需要实际的时间来完成获取请求。可能比处理轮询消息所需的时间还要多。
  2. 消费者可以不调用 fetch,因为内部队列中仍然有一些记录可用,并且如此高的最小大小的新 fetch 将导致队列溢出。因此,在从代理获取消息之前,需要进行另一次轮询来耗尽内部队列中的剩余记录。

因此,请尝试大幅(例如十倍)减少

fetch_min_bytes
或直接删除该属性,因为它的默认值为 1。


0
投票

受 max.partition.fetch.bytes 限制,

Kafka 文档

最大分区.获取字节数

此属性控制服务器将返回每个分区的最大字节数。默认值为 1 MB,这意味着当 KafkaConsumer.poll() 返回 ConsumerRecords 时,记录对象将最多使用分配给消费者的每个分区 max.partition.fetch.bytes 。因此,如果一个主题有 20 个分区,并且您有 5 个消费者,则每个消费者将需要有 4 MB 的内存可用于 ConsumerRecords。在实践中,您将需要分配更多内存,因为如果组中的其他使用者发生故障,每个使用者将需要处理更多分区。最大限度。 partition.fetch.bytes 必须大于代理将接受的最大消息(由代理配置中的 max.message.size 属性确定),否则代理可能有消费者无法使用的消息,在这种情况下消费者会试图阅读它们。设置 max.partition.fetch.bytes 时的另一个重要考虑因素是消费者处理数据所花费的时间。您还记得,消费者必须足够频繁地调用 poll() 以避免会话超时和随后的重新平衡。如果单个 poll() 返回的数据量非常大,消费者可能需要更长的时间来处理,这意味着它将无法及时到达轮询循环的下一次迭代以避免会话超时。如果发生这种情况,两个选项要么降低最大值。 partition.fetch.bytes 或增加会话超时。

© www.soinside.com 2019 - 2024. All rights reserved.