我使用Spring Kafka。如果处理过程中发生错误,我想停止接收消息,然后在一段时间后恢复接收。 我看到 spring kafka 有一个
CommonContainerStoppingErrorHandler
处理程序。
但据我了解,它只是停止阅读。是否可以使用标准方式开始回读?
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.setConcurrency(2);
factory.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());
factory.setConsumerFactory(consumerFactory());
return factory;
}
我现在已经编写了一个处理程序,但对我来说这似乎不是一个好的解决方案。
@Bean
CommonErrorHandler eh() {
return new DefaultErrorHandler((rec, ex) -> {
System.out.println("Close");
}, new FixedBackOff(1000L, 1)) {
@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,Consumer<?, ?> consumer, MessageListenerContainer container) {
if (container!=null && container.isRunning()){
container.stop();
taskExecuror.schedule(()->container.start(), delay);
}
}
};
}
还有一个问题,据我了解,当指定并发2时,会在主容器内部创建子容器,这些子容器对消费者负责。父容器被传递给方法
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,Consumer<?, ?> consumer, MessageListenerContainer container)
,并为此容器调用 stop
停止接收来自所有分区的消息。如果当前正在其中一个容器中处理一条消息,会发生什么情况,stop
方法会等待处理结束吗?处理后偏移量会保存吗?
在第一个场景中,如果您想在一段时间后继续处理,为什么不能使用具有正常重试功能的错误处理程序?您可以尝试使用指数退避或类似的方法重试。据我所知,这达到了相同的结果。如果您有严格的用例,需要在延迟一段时间后继续容器,那么以编程方式启动容器(如上所示)是一种有效的策略。
对于第二个问题,停止容器时,会等待所有获取的记录处理完毕才停止容器。如果您在容器上启用
stopImmediate
属性,则容器会在处理当前记录后停止。