Spring kafka消费者滞后指标始终为0

问题描述 投票:0回答:1

我们正在使用

micrometer
为 Spring Boot 应用程序构建一个监控系统。 收集的指标通过
micrometer-registry-elastic
发布到 Elasticsearch 实例。 一切都很好,除了:

  • kafka_consumer_fetch_manager_records_lag
    始终为 0,即使我确定消费者组存在滞后
  • kafka_consumer_fetch_manager_records_lag_avg
    始终为 0,即使我确定消费者组存在滞后
  • kafka_consumer_fetch_manager_records_lag_max
    仅在第一次暴露测量时具有不同于 0 的值。

所有其他指标(例如

kafka_consumer_fetch_manager_records_lead
)均已正确设置。

涉及版本:

  • spring-boot:2.5.4
  • micrometer:1.7.3
  • micrometer-registry-elastic:1.7.3
  • spring-kafka:2.7.6
  • kafka-clients:2.7.1
  • Kafka broker: 2.7.0

我调试了整个设置,没有错误的证据,

MicrometerConsumerListener
已正确创建,甚至
KafkaClientMetrics
和所有
Sensor
实例。我不知道问题是什么,我们没有任何特定的定制,也没有错误日志消息。 对于上述指标,似乎没有值不同于 0 的样本,但我很确定代理上存在延迟,因为我直接在代理上通过命令行工具验证了这一点。

有什么想法吗? 非常感谢

spring-boot apache-kafka spring-kafka spring-micrometer
1个回答
0
投票

我的服务也遇到同样的问题。有人调试成功了吗?

spring-boot 3.1.5
spring-kafka 3.0.12
kafka-clients 3.5.1
micrometer 1.11.5

我的弹簧配置如下:

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: my.own.KafkaProtobufSerializer
      acks: all
      retries: 2147483647
      properties:
        max:
          block.ms: 9223372036854775807
          in.flight.requests.per.connection: 1
    consumer:
      group-id: service-name
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: my.own.ProtoDeserializer
      enable-auto-commit: false
      properties:
        auto.offset.reset: earliest
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    security:
      protocol: SASL_SSL
    properties:
      sasl:
        mechanism: AWS_MSK_IAM
        jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
        client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
    listener:
      ack-mode: manual_immediate

容器工厂的构造如下:

    @Bean
    fun kafkaListenerContainerFactory(
        consumerFactory: ConsumerFactory<String, Message>,
        deadLetterErrorHandler: CommonErrorHandler,
        ignoreForeignDeadLetterMessagesFilterStrategy: RecordFilterStrategy<String, Message>
    ): ConcurrentKafkaListenerContainerFactory<String, Message> =
        ConcurrentKafkaListenerContainerFactory<String, Message>().apply {
            this.consumerFactory = consumerFactory
            this.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE            this.containerProperties.setAuthExceptionRetryInterval(config.authRetryInterval)
         this.setRecordFilterStrategy(ignoreForeignDeadLetterMessagesFilterStrategy)
            this.setCommonErrorHandler(deadLetterErrorHandler)
            this.setAckDiscarded(true)
        }

该服务在普罗米修斯端点上公开此指标:

# HELP kafka_consumer_fetch_manager_records_lag The latest lag of the partition
# TYPE kafka_consumer_fetch_manager_records_lag gauge
kafka_consumer_fetch_manager_records_lag{application="my-service",client_id="consumer-my-service-2",kafka_version="3.5.1",partition="3",spring_id="kafkaConsumerFactory.consumer-my-service-2",topic="the-topic",} 0.0
© www.soinside.com 2019 - 2024. All rights reserved.