我们正在使用
micrometer
为 Spring Boot 应用程序构建一个监控系统。
收集的指标通过 micrometer-registry-elastic
发布到 Elasticsearch 实例。
一切都很好,除了:
kafka_consumer_fetch_manager_records_lag
始终为 0,即使我确定消费者组存在滞后kafka_consumer_fetch_manager_records_lag_avg
始终为 0,即使我确定消费者组存在滞后kafka_consumer_fetch_manager_records_lag_max
仅在第一次暴露测量时具有不同于 0 的值。所有其他指标(例如
kafka_consumer_fetch_manager_records_lead
)均已正确设置。
涉及版本:
spring-boot:2.5.4
micrometer:1.7.3
micrometer-registry-elastic:1.7.3
spring-kafka:2.7.6
kafka-clients:2.7.1
Kafka broker: 2.7.0
我调试了整个设置,没有错误的证据,
MicrometerConsumerListener
已正确创建,甚至KafkaClientMetrics
和所有Sensor
实例。我不知道问题是什么,我们没有任何特定的定制,也没有错误日志消息。
对于上述指标,似乎没有值不同于 0 的样本,但我很确定代理上存在延迟,因为我直接在代理上通过命令行工具验证了这一点。
有什么想法吗? 非常感谢
我的服务也遇到同样的问题。有人调试成功了吗?
spring-boot 3.1.5
spring-kafka 3.0.12
kafka-clients 3.5.1
micrometer 1.11.5
我的弹簧配置如下:
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: my.own.KafkaProtobufSerializer
acks: all
retries: 2147483647
properties:
max:
block.ms: 9223372036854775807
in.flight.requests.per.connection: 1
consumer:
group-id: service-name
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: my.own.ProtoDeserializer
enable-auto-commit: false
properties:
auto.offset.reset: earliest
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
security:
protocol: SASL_SSL
properties:
sasl:
mechanism: AWS_MSK_IAM
jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
listener:
ack-mode: manual_immediate
容器工厂的构造如下:
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, Message>,
deadLetterErrorHandler: CommonErrorHandler,
ignoreForeignDeadLetterMessagesFilterStrategy: RecordFilterStrategy<String, Message>
): ConcurrentKafkaListenerContainerFactory<String, Message> =
ConcurrentKafkaListenerContainerFactory<String, Message>().apply {
this.consumerFactory = consumerFactory
this.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE this.containerProperties.setAuthExceptionRetryInterval(config.authRetryInterval)
this.setRecordFilterStrategy(ignoreForeignDeadLetterMessagesFilterStrategy)
this.setCommonErrorHandler(deadLetterErrorHandler)
this.setAckDiscarded(true)
}
该服务在普罗米修斯端点上公开此指标:
# HELP kafka_consumer_fetch_manager_records_lag The latest lag of the partition
# TYPE kafka_consumer_fetch_manager_records_lag gauge
kafka_consumer_fetch_manager_records_lag{application="my-service",client_id="consumer-my-service-2",kafka_version="3.5.1",partition="3",spring_id="kafkaConsumerFactory.consumer-my-service-2",topic="the-topic",} 0.0