KafkaStreams 指标,每个流处理的记录/提取拓扑中每个流的偏移量

问题描述 投票:0回答:1

我正在使用

org.apache.kafka.streams.KafkaStreams
,例如我的拓扑如下所示:

    StreamsBuilder builder = new StreamsBuilder();

    builder.stream("input-topic1")
            .mapValues((readOnlyKey, value) -> value.toUpperCase())
            .to("output-topic1");

    builder.stream("input-topic2")
            .mapValues((readOnlyKey, value) -> value.toUpperCase())
            .to("output-topic2");

默认情况下每两分钟 KafkaStream 记录一次:

Processed 14 total records, ran 0 punctuators, and committed 11 total tasks since the last update"}

如果每个输入主题中有任何传入消息,我希望有更好的概述。我希望看到每个流中的更多指标,而不是处理的记录总数。是否可以为流指定一个名称,然后分别提取

builder.stream("input-topic1")
builder.stream("input-topic2")
的偏移量?或者有可能知道每个流在某种时间范围内处理了多少记录。

  • 也许我可以使用
    .peek
    并在流中使用某种静态变量,但我认为这种方法非常糟糕。
  • 我还研究了 KafkaStreams 指标,但我还没有找到我正在寻找的此类功能。
  • 每次我的流处理一条消息时都记录日志也不是一种选择,因为它会生成太多日志。
apache-kafka apache-kafka-streams
1个回答
0
投票

如果每个输入主题中有任何传入消息,可以更好地概览

这只是消费者滞后指标,可以监控,是的

刚刚询问的相关帖子 - 使用 kafka jmx 导出器公开 kafka 主题 LAG

也许我可以使用 .peek 并在流中添加某种静态变量

您可以使用 Micrometer 公制注册表。这根本不是一个坏习惯

© www.soinside.com 2019 - 2024. All rights reserved.