Kafka Consumer 陷入再平衡状态

问题描述 投票:0回答:1

我们有一个 Kafka 消费者,它突然(没有任何活动)进入重新平衡状态并陷入困境。这导致k8 pod的CPU爆了,GC时间也接近70-80%。该节点没有从该状态恢复。删除所有主题后,大约 4-5 小时后恢复。

卡夫卡版本 - 2.1.1 主题数量 - 520(每个主题 10 个分区) 消费者群体 - 1 分区分配策略-sticky

附上当时的一些信息日志。

2021-10-12 10:41:21 
2021-10-12 05:11:21.160   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Member consumer-staging_notification_consumer-10-8a7eb399-c4e0-4443-b330-bfac42ca89ae sending LeaveGroup request to coordinator kafka5:9092 (id: 2147483642 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.


2021-10-12 05:10:27.340   INFO 6 CID:4967  UID:  RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group


2021-10-12 10:40:27 
2021-10-12 05:10:27.340   INFO 6 CID:4967  UID:  RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Lost previously assigned partitions staging_notification_topic_app_low_mobi_3119-4, staging_notification_topic_app_low_mobi_2023-4, staging_notification_topic_app_low_mobi_5170-9, staging_notification_topic_app_low_mobi_3540-9, staging_notification_topic_app_low_mobi_5722-9,


2021-10-12 10:40:37 
2021-10-12 05:10:37.247   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:37 
2021-10-12 05:10:37.183   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing


2021-10-12 10:40:57 
2021-10-12 05:10:57.435   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:57 
2021-10-12 05:10:57.435   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing



2021-10-12 10:41:56 
2021-10-12 05:11:56.922   INFO 6 CID:4967  UID:  RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Failing OffsetCommit request since the consumer is not part of an active group
2021-10-12 10:41:56 
2021-10-12 05:11:56.923  ERROR 6 CID:4967  UID:  RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
2021-10-12 10:41:56 
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124)
2021-10-12 10:41:56 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2021-10-12 10:41:56 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2021-10-12 10:41:56 
    at java.lang.Thread.run(Thread.java:748)
2021-10-12 10:41:56 
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2021-10-12 10:41:56 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
2021-10-12 10:41:56 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
2021-10-12 10:41:56 
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
2021-10-12 10:41:56 
    ... 3 common frames omitted


2021-10-12 10:41:58 
2021-10-12 05:11:58.442   INFO 6 CID:4967  UID:  RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group


021-10-12 10:58:36  
2021-10-12 05:28:36.039   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36 
2021-10-12 05:28:36.044   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group

021-10-12 10:58:36  
2021-10-12 05:28:36.039   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36 
2021-10-12 05:28:36.044   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group




2021-10-12 11:05:36 
2021-10-12 05:35:36.955   INFO 6 CID:3435  UID:  RID:17c72f29913-886c --- [ntainer#7-3-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-5, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.DisconnectException

当我删除主题后,这个问题就解决了。

当我增加并发侦听器工厂中的线程数量时,观察到类似的行为

,消费者无法提供类似的不断重新平衡的日志

spring-boot apache-kafka spring-kafka
1个回答
0
投票
表示后续调用 poll() 之间的时间比配置的

max.poll.interval.ms 长(默认五分钟)。 发生这种情况时,消费者客户端会主动向协调者发起LeaveGroup

请求,以

触发重新平衡 您可以通过增加 max.poll.interval.ms 或使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题。

当然更好的办法是检查程序处理速度慢的原因并进行优化。

© www.soinside.com 2019 - 2024. All rights reserved.