Kafka Consumer 消费消息时间长

问题描述 投票:0回答:1

我正在使用 Spring Kafka 发布消息。当我向 kafka 主题发布 100 条消息时,我可以看到它被 Kafka 消费者接收,在某些情况下至少延迟 1 秒甚至 4 到 5 秒。我正在使用的主题有 6 个分区,我将消费者的并发性保持在 6.

我使用了 Kafka 消费者的默认属性值。知道可能有什么问题或我可以调整哪个属性以将延迟最小化到 1 到 2 毫秒吗?我尝试发布 100、500、1000、5000 条消息,但延迟仍然存在。

消费者属性:

  • 请求超时毫秒 30 秒
  • 心跳间隔ms 3秒
  • 最大轮询间隔毫秒 5 分钟
  • 最大投票记录 200
  • 会话超时毫秒 45 秒

消费类

这是我的消费方法:

@KafkaListener(groupid = AlertsKafkaConfig.GROUP_ID_JSON, topics = TopicNameConstants.Webhook_doc_process_topic_name, containerFacotry = KafkaTopicConstans.WEBHOOK_PROCESS_TOPIC_CONF)
Public void receiveProcessPricessingMessage(@Payload String kafkajsonstring, @Header(KafkaHeaders RECEIVED_TIMESTAMP) String timestamp, @Header(KafkaHeaders.OFFSET) String offset) throws JsonProcessingException {

    try {
        long startime = System.currenttimemillis();
        RequestFactory.appendKafkaId(offset);
        long endtime = System.currenttimemillis();
        Gson gson = new Gson();
        WebhookProcessingCommand eventprocessingcommand = gson.fromJson(kafkajsonstring, WebhookProcessingCommand.class);

        BaseAbstractEventHandler eventprocessinghandler = eventhandlerfactory.getEventhandlerInstance(EventHandler.WEBHOOK_JSON_EVENT_HANDLER.getHandlerName());
        eventprocessinghandler.processEvent(eventprocessingcommand);

    } catch (Exception e) {
        LOGGER.error(ExceptionUtils.fullStackTrace(e));

    }
}
java apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

根据您当前的设置,Kafka 消费者同步运行并在单个线程中一条一条地读取消息。如果侦听器方法运行得足够快以准备好及时接收下一条消息,这可能没问题。提供的代码绝对不是这种情况。这是您在日志中看到的差距的主要原因。每条消息的处理都需要很长时间。

主要的sinn是在handler方法里面构造一个新类。它应该只做一次。最好作为整个应用程序的单例。 Spring 可以在这里帮助配置 bean 和依赖注入。

我不知道类 RequesterFactory(RequestFactory?)。也许他们也需要一些时间?以及负载的 JSON 反序列化。在另一个地方,事件处理程序的实例将在每次调用中动态生成。

尝试重构代码并检查结果。如果您需要更快,那么您可以优化消费者的并发性。现在每个分区只有一个。

© www.soinside.com 2019 - 2024. All rights reserved.