我们有一个 Kafka 消费者,它突然(没有任何活动)进入重新平衡状态并陷入困境。这导致k8 pod的CPU爆了,GC时间也接近70-80%。该节点没有从该状态恢复。删除所有主题后,大约 4-5 小时后恢复。
卡夫卡版本 - 2.1.1 主题数量 - 520(每个主题 10 个分区) 消费者群体 - 1 分区分配策略-sticky
附上当时的一些信息日志。
2021-10-12 10:41:21
2021-10-12 05:11:21.160 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Member consumer-staging_notification_consumer-10-8a7eb399-c4e0-4443-b330-bfac42ca89ae sending LeaveGroup request to coordinator kafka5:9092 (id: 2147483642 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-10-12 05:10:27.340 INFO 6 CID:4967 UID: RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-10-12 10:40:27
2021-10-12 05:10:27.340 INFO 6 CID:4967 UID: RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Lost previously assigned partitions staging_notification_topic_app_low_mobi_3119-4, staging_notification_topic_app_low_mobi_2023-4, staging_notification_topic_app_low_mobi_5170-9, staging_notification_topic_app_low_mobi_3540-9, staging_notification_topic_app_low_mobi_5722-9,
2021-10-12 10:40:37
2021-10-12 05:10:37.247 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:37
2021-10-12 05:10:37.183 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:57
2021-10-12 05:10:57.435 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:57
2021-10-12 05:10:57.435 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:41:56
2021-10-12 05:11:56.922 INFO 6 CID:4967 UID: RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Failing OffsetCommit request since the consumer is not part of an active group
2021-10-12 10:41:56
2021-10-12 05:11:56.923 ERROR 6 CID:4967 UID: RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
2021-10-12 10:41:56
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
2021-10-12 10:41:56
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
2021-10-12 10:41:56
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124)
2021-10-12 10:41:56
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2021-10-12 10:41:56
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2021-10-12 10:41:56
at java.lang.Thread.run(Thread.java:748)
2021-10-12 10:41:56
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2021-10-12 10:41:56
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
2021-10-12 10:41:56
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
2021-10-12 10:41:56
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
2021-10-12 10:41:56
... 3 common frames omitted
2021-10-12 10:41:58
2021-10-12 05:11:58.442 INFO 6 CID:4967 UID: RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
021-10-12 10:58:36
2021-10-12 05:28:36.039 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36
2021-10-12 05:28:36.044 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group
021-10-12 10:58:36
2021-10-12 05:28:36.039 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36
2021-10-12 05:28:36.044 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group
2021-10-12 11:05:36
2021-10-12 05:35:36.955 INFO 6 CID:3435 UID: RID:17c72f29913-886c --- [ntainer#7-3-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-5, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.DisconnectException
当我删除主题后,这个问题就解决了。
当我增加并发侦听器工厂中的线程数量时,观察到类似的行为,消费者无法提供类似的不断重新平衡的日志
max.poll.interval.ms 长(默认五分钟)。 发生这种情况时,消费者客户端会主动向协调者发起LeaveGroup
请求,以触发重新平衡。 您可以通过增加 max.poll.interval.ms 或使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题。
当然更好的办法是检查程序处理速度慢的原因并进行优化。