我们试图在指定的窗口时间从 Kafka 读取数据(因此我们有 Kafka 消费者),这意味着避免在其他时间读取数据。但是,我们不确定在时间段到期后如何关闭消费者。我想知道是否有任何例子说明如何做到这一点?非常感谢您帮助我们。
您可以禁用
autoStartup
,然后使用KafkaListenerEndpointRegistry
containers
和start
方法手动启动kafkastop
@KafkaListener 生命周期管理
public class KafkaConsumer {
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Schedule(cron = "")
public void scheduledMethod() {
registry.start();
registry.stop()
}
但是在上面的方法中,不能保证所有来自kafka的消息都会在那个时间范围内被消费(这取决于负载和处理速度)
我有相同的用例,但我编写了调度程序,指定了一个批次的最大轮询记录,并保留了一个计数器,如果计数器与最大轮询记录匹配,那么我认为该批次的处理已完成,因为它已经处理了记录它是在一次投票中得到的。
然后我取消订阅该主题并关闭消费者。下次调度程序运行时,它将再次处理指定限制的最大轮询记录。
fixedDelayString 实现调度程序在指定时间限制后启动的目的,一旦前一个完成。
@EnableScheduling
public class MessageScheduler{
@Scheduled(initialDelayString = "${fixedInitialDelay.in.milliseconds}", fixedDelayString = "${fixedDelay.in.milliseconds}")
public void run(){
/*write your kafka consumer here with manual commit*/
/*once your batch is finished processing unsubcribe and close the consumer*/
kafkaConsumer.unsubscribe();
kafkaConsumer.close();
}
}
问题
Kafka spring boot listener read messages in time interval
被标记为与此重复,但我认为它有不同的反应。
为了更改默认(0 毫秒)间隔字符串轮询消息,您可以在创建 containerFactory bean 时设置新的 idleBetweenPolls
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaEventListenerObjectContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = ConcurrentKafkaListenerContainerFactory()
factory.containerProperties.setIdleBetweenPolls(500L); //interval in milisecons
...
return factory;
}