当我遇到异常时,kafka 侦听器不得处理新消息。它应该等到异常情况得到解决(可能需要手动干预),然后开始消费消息。
配置
@Bean("consumerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase>
kafkaListenerContainerFactory() {
String concurrency = consumer == null ? "1" : consumer.getConcurrency();
ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setConcurrency(Integer.parseInt(concurrency));
factory.setCommonErrorHandler(getDefaultErrorHandler());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
log.info("Kafka Consumer Created Successfully");
return factory;
}
@Bean
public DefaultErrorHandler getDefaultErrorHandler() {
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler();
defaultErrorHandler.setAckAfterHandle(false);
return defaultErrorHandler;
}
public Map<String, Object> consumerConfigs() {
ConsumerConfiguration config = consumer == null ? new ConsumerConfiguration() : consumer;
Map<String, Object> consumerProperties = new HashMap<>(kafkaCommonConfigs());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
if (StringUtils.isNotBlank(config.getMaxPollInterval())) {
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
Integer.parseInt(config.getMaxPollInterval()));
}
if (StringUtils.isNotBlank(config.getSessionTimeout())) {
consumerProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.parseInt(config.getSessionTimeout()));
}
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED);
consumerProperties.put(SPECIFIC_AVRO_READER_CONFIG, true);
return consumerProperties;
}
听众
@KafkaListener(topics = "#{topicName}",
groupId = "#{consumerGroupId}",
containerFactory = "consumerContainerFactory")
public void listenProgramNumberMessages(ConsumerRecord<String, Event> record,
Acknowledgment acknowledgment) {
try {
var event = record.value();
log.info("Received message: {}", event);
if(isProcessingAllowed(event)){
log.info("Consumed message: {}" , marketChannelCustomerChoiceEvent);
if(event.getCustomerChoice().getAbbreviatedDescription()
.contains("BLENDER")){
throw new RuntimeException();
}
listenerService.doSomething();
acknowledgment.acknowledge();
}
} catch (Exception e) {
log.error("Exception occurred :", e);
acknowledgment.nack(Duration.ofMillis(1000));
}
}
我正在通过显式抛出 RuntimeException 来测试它。因此,如果我更改/发布与 BLENDER 相关的内容,我会在日志中收到异常,并且滞后计数会增加,但是当我更改/发布与 BLENDER 无关的内容时,消息将被消耗。我希望消息被卡住。有办法做到吗? Spring-Kafka版本是2.8.1 Spring boot 2.7
您可以使用
CommonContainerStoppingErrorHandler
(它完全停止容器,可以通过 ListenerContainerEndpointRegistry
bean 重新启动)或使用具有无限重试 (FixedBackOff
) 的 Long.MAX_VALUE
以及 DefaultErrorHandler
中合适的重试间隔
;默认后退是 9 次重试,没有间隔。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#container-stopping-error-handlers
https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh