我正在使用
org.apache.kafka.streams.KafkaStreams
,例如我的拓扑如下所示:
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic1")
.mapValues((readOnlyKey, value) -> value.toUpperCase())
.to("output-topic1");
builder.stream("input-topic2")
.mapValues((readOnlyKey, value) -> value.toUpperCase())
.to("output-topic2");
默认情况下每两分钟 KafkaStream 记录一次:
Processed 14 total records, ran 0 punctuators, and committed 11 total tasks since the last update"}
如果每个输入主题中有任何传入消息,我希望有更好的概述。我希望看到每个流中的更多指标,而不是处理的记录总数。是否可以为流指定一个名称,然后分别提取
builder.stream("input-topic1")
和 builder.stream("input-topic2")
的偏移量?或者有可能知道每个流在某种时间范围内处理了多少记录。
.peek
并在流中使用某种静态变量,但我认为这种方法非常糟糕。如果每个输入主题中有任何传入消息,可以更好地概览
这只是消费者滞后指标,可以监控,是的
刚刚询问的相关帖子 - 使用 kafka jmx 导出器公开 kafka 主题 LAG
也许我可以使用 .peek 并在流中添加某种静态变量
您可以使用 Micrometer 公制注册表。这根本不是一个坏习惯