如何使用测微计指标监控 Kafka 中的事件消费者延迟?

问题描述 投票:0回答:1

我想为 Kafka 中消费者(事件侦听器)的当前滞后发布一个指标“量表”。我想监控消息的消费速度是否很快,或者是否太慢。这些指标最终会与CloudWatch连接。我正在使用 Springboot、Micrometer 库、CloudWatch。

在事件处理程序或侦听器(消费者)中,我执行了以下操作 -

@KafkaListener(topics = "your-topic",filter ="my-filter")
public void listenerExample(CloudEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName,
        Consumer<?, ?> consumer) {
    String currentLag = consumer.metrics().values().stream().filter(m -> "records-lag-     max".equals(m.metricName().name()))
            .map(Metric::metricValue).map(Object::toString).distinct()
            .collect(Collectors.joining(";", "[Kafka current consumer lag]", " records"));
    LOGGER.info(lag);

//The currentLag is a String here. We need to convert it to an Integer but the String is pretty big due to //the prefix and the suffix. Do we need to substring the number from the string?    
    
    meterRegistry.gauge("consumer.lag",Integer.parseInt(currentLag));
}

借助 CloudWatchConfig()、CloudWatchMeterRegistry() 完成与 CloudWatch 的连接,并使用此注册表进一步发布指标。


MeterRegistry registry = new SimpleMeterRegistry();
      registry = new CloudWatchMeterRegistry(new CloudWatchConfig() {
        private final Map<String, String> configuration =
          Map.of("cloudwatch.namespace", "Name/" + namespace, "cloudwatch.step",            Duration.ofMinutes(5).toString());


但是,我在 CloudWatch 中没有看到任何指标“consumer.lag”。我做得正确吗?我还是不知道应该如何实施

meterRegistry.gauge("consumer.lag",Integer.parseInt(currentLag));

java spring-boot metrics micrometer
1个回答
0
投票

请查看

KafkaMetrics
,它应该为您提供此信息(以及更多信息)。

另外,请小心自己这样做:

  1. Micrometer
    Gauge
    默认只维护对正在观察的对象的弱引用,以免阻止对象的垃圾收集。当GC发生并且对象被收集时,它将不再可供Gauge使用,您需要维护强引用,请参阅文档:Gauges
  2. consumer.metrics()
    在Kafka中非常棘手。 Kafka 客户端有时会删除对所有这些指标的引用,并在 Micrometer 或您尝试使用它们时重新创建它们。如果您查看
    KafkaMetrics
    ,它会尝试通过重新注册仪表来解决这个问题(破解这个问题)。

Kafka 客户端问题的真正解决方案是 Kafka 使用 Micrometer 来检测自身,如果您认为您也可以从中受益,请对此问题发表评论,以便 Kafka 开发人员可以添加此/接受 PR:https ://issues.apache.org/jira/browse/KAFKA-15191

© www.soinside.com 2019 - 2024. All rights reserved.