我正在使用 kafka 和 spring-starter-parent 版本 2.6.6
最近,当kafka代理重新启动时,我的kafka客户端变得僵尸(我们有3个代理,并且是滚动重启)
需要重新启动 kafka 客户端才能恢复其活动。
日志如下:
o.apache.kafka.common.network.Selector : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Failed authentication with broker_server (Authentication failed: Invalid username or password)
org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Connection to node 2 (broker_server) failed authentication due to: Authentication failed: Invalid username or password
o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Error sending fetch request (sessionId=1125142585, epoch=INITIAL) to node 2:
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] An authentication error occurred in the heartbeat thread
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
o.apache.kafka.common.network.Selector : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Failed authentication with broker_server_03 Authentication failed: Invalid username or password)
org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Connection to node 2147483644 (broker_server_03) failed authentication due to: Authentication failed: Invalid username or password
o.s.k.l.KafkaMessageListenerContainer : Authentication/Authorization Exception and no authExceptionRetryInterval set
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
o.s.k.l.KafkaMessageListenerContainer : Fatal consumer exception; stopping container
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Revoke previously assigned partitions group-0, group-1
o.s.k.l.KafkaMessageListenerContainer : xxx-registration-group: partitions revoked: [group-0, group-1]
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Group coordinator xxx (id: 2147483644 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Resetting generation due to: consumer pro-actively leaving the group
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-xxx-registration-group-1, groupId=xxx-registration-group] Request joining group due to: consumer pro-actively leaving the group
org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-xxx-registration-group-1 unregistered
o.s.k.l.KafkaMessageListenerContainer : xxx-registration-group: Consumer stopped
c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
客户端没有自定义配置,大部分都是默认的。
如何处理此事件,以便我的消费者可以继续该过程而无需手动重新启动它??
找到这个问题的解决方案有运气吗?我也有同样的问题。