KAFKA Consumer当前设置:
max_poll_records: 2000
max_poll_interval_ms: 40000
fetch_min_bytes: 104857600
fetch_max_wait_ms: 10000
fetch_max_bytes: 104857600
request_timeout_ms: 300000
max_partition_fetch_bytes: 104857600
heartbeat_interval_ms: 60000
session_timeout_ms: 180000
KAFKA Broker 的当前设置:
fetch.max.bytes: 104857600
通过这些设置,我可以一次轮询 2000 条消息,大小约为 84MB(一次轮询的大小)。 但我收到的第二条消息比以前少,而该消费者存在足够数量的滞后。
我只推送到一个分区,以便轮询也从一个分区进行。
如代码截图所示,轮询的消息具有以下模式: 第一次投票:2000 条消息 第二次民意调查:484 条消息 第三次民意调查:2000 条消息 第四次民意调查:484 条消息 第 5 次投票:2000 条消息 第 6 次民意调查:484 条消息 第 7 次民意调查:548 条消息 等等
谁能帮我解决这个问题,为什么会发生这种情况?
我尝试在谷歌上搜索解决方案,但没有找到太多
Kafka 消费者维护一个内部队列,一方面从代理获取数据,另一方面通过民意调查耗尽数据。在轮询 2000 条记录后,它几乎变成空的,Kafka 消费者开始获取丢失的记录。由于您将
fetch_min_bytes
设置为 100Mb:
因此,请尝试大幅(例如十倍)减少
fetch_min_bytes
或直接删除该属性,因为它的默认值为 1。
受 max.partition.fetch.bytes 限制,
最大分区.获取字节数
此属性控制服务器将返回每个分区的最大字节数。默认值为 1 MB,这意味着当 KafkaConsumer.poll() 返回 ConsumerRecords 时,记录对象将最多使用分配给消费者的每个分区 max.partition.fetch.bytes 。因此,如果一个主题有 20 个分区,并且您有 5 个消费者,则每个消费者将需要有 4 MB 的内存可用于 ConsumerRecords。在实践中,您将需要分配更多内存,因为如果组中的其他使用者发生故障,每个使用者将需要处理更多分区。最大限度。 partition.fetch.bytes 必须大于代理将接受的最大消息(由代理配置中的 max.message.size 属性确定),否则代理可能有消费者无法使用的消息,在这种情况下消费者会试图阅读它们。设置 max.partition.fetch.bytes 时的另一个重要考虑因素是消费者处理数据所花费的时间。您还记得,消费者必须足够频繁地调用 poll() 以避免会话超时和随后的重新平衡。如果单个 poll() 返回的数据量非常大,消费者可能需要更长的时间来处理,这意味着它将无法及时到达轮询循环的下一次迭代以避免会话超时。如果发生这种情况,两个选项要么降低最大值。 partition.fetch.bytes 或增加会话超时。