查找Spring kafka批处理时间

问题描述 投票:0回答:2

我有一个具有以下配置的 kafka 消费者

enable.auto.commit : true
max.poll.records: 50
auto.commit.interval.ms : 10000

消费者方法如下,

@KafkaListener(topics = ("my-topic"),groupId = ("groupId"),
 containerFactory = "listenerContainerFactory",concurrency = 2 ,autoStartup = "true")
    public void myListener(@Payload String message) {
        System.out.println("Received Message : " + message);
        // do some heavy processing
    }

我知道Spring kafka一次会获取50条消息(max.poll.records)并使用两个线程来处理消息(并发= 2)。

现在我需要找出完成这批工作需要多长时间。这意味着 20 条消息有两个线程。

我可以把记录器放在哪里来获取这些信息?

java spring spring-boot apache-kafka spring-kafka
2个回答
0
投票

它将获取最多 50 个(并不总是 50 个)。

您可以使用 Micrometer Metrics 来衡量每条消息的侦听器性能。

从版本 2.3 开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,则侦听器容器将自动为侦听器创建和更新 Micrometer Timer。

我们目前没有测量处理整个批次的时间。


0
投票

您可以将侦听器更改为

batch = "true"
并观察度量计时器“spring.kafka.listener”

您可以保持非批量方法并观察指标“kafka_consumer_time_ Between_poll_avg”。轮询之间的时间很好地近似了侦听器的工作时间。 不幸的是,当分区上没有事件时,“kafka_consumer_time_ Between_poll_avg”趋向于 5 秒 - 这是“idleBetweenPolls”的默认值。

© www.soinside.com 2019 - 2024. All rights reserved.