有读取Kafka主题的Spring Schedule例子吗?

问题描述 投票:0回答:3

我们试图在指定的窗口时间从 Kafka 读取数据(因此我们有 Kafka 消费者),这意味着避免在其他时间读取数据。但是,我们不确定在时间段到期后如何关闭消费者。我想知道是否有任何例子说明如何做到这一点?非常感谢您帮助我们。

apache-kafka kafka-consumer-api spring-kafka spring-scheduled
3个回答
8
投票

您可以禁用

autoStartup
,然后使用
KafkaListenerEndpointRegistry
containers
start
方法手动启动kafka
stop
@KafkaListener 生命周期管理

public class KafkaConsumer {

 @Autowired
 private KafkaListenerEndpointRegistry registry;

  @KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
  public void listen(...) { ... }

  @Schedule(cron = "")
  public void scheduledMethod() {

   registry.start();

   registry.stop()
  }

但是在上面的方法中,不能保证所有来自kafka的消息都会在那个时间范围内被消费(这取决于负载和处理速度)


1
投票

我有相同的用例,但我编写了调度程序,指定了一个批次的最大轮询记录,并保留了一个计数器,如果计数器与最大轮询记录匹配,那么我认为该批次的处理已完成,因为它已经处理了记录它是在一次投票中得到的。

然后我取消订阅该主题并关闭消费者。下次调度程序运行时,它将再次处理指定限制的最大轮询记录。

fixedDelayString 实现调度程序在指定时间限制后启动的目的,一旦前一个完成。

@EnableScheduling
public class MessageScheduler{

        @Scheduled(initialDelayString = "${fixedInitialDelay.in.milliseconds}", fixedDelayString = "${fixedDelay.in.milliseconds}")
        public void run(){

            /*write your kafka consumer here with manual commit*/

            /*once your batch is finished processing unsubcribe and close the consumer*/
                kafkaConsumer.unsubscribe();
                kafkaConsumer.close();
        }

}

0
投票

问题

Kafka spring boot listener read messages in time interval
被标记为与此重复,但我认为它有不同的反应。

为了更改默认(0 毫秒)间隔字符串轮询消息,您可以在创建 containerFactory bean 时设置新的 idleBetweenPolls

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaEventListenerObjectContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, String> factory = ConcurrentKafkaListenerContainerFactory()
    factory.containerProperties.setIdleBetweenPolls(500L); //interval in milisecons
        ...
    return factory;
} 
© www.soinside.com 2019 - 2024. All rights reserved.