我正在使用 Spring Kafka 消费者,它从主题中获取消息并将其保存到数据库中。如果满足失败条件,例如数据库不可用,kafka消费者库是否提供重试机制?如果确实如此,有没有办法在将消息删除到 dlt 后的 1 分钟内设置重试(不想使用间隔时间和 maxAttempt)
这是我的消费者配置
@Configuration
@RequiredArgsConstructor
public class ConsumerConfiguration {
private final KafkaTemplate <String,TransactionEvent> kafkaTemplate;
public Map<String, Object> consumerConfig() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
return consumerConfig;
}
@Bean
public ConsumerFactory<String, TransactionEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(TransactionEvent.class)));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TransactionEvent>> kafkaListenerContainerFactory(
ConsumerFactory<String, TransactionEvent> consumerFactory, DefaultErrorHandler errorHandler
) {
ConcurrentKafkaListenerContainerFactory<String, TransactionEvent> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerContainerFactory.setConsumerFactory(consumerFactory);
listenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
listenerContainerFactory.setCommonErrorHandler(errorHandler);
return listenerContainerFactory;
}
@Bean
public DefaultErrorHandler errorHandler() {
return new DefaultErrorHandler(recover(), new FixedBackOff(2000,5));
}
@Bean
public DeadLetterPublishingRecoverer recover() {
final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
CUSTOMIZE_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic().concat("DLT"),cr.partition());
return new
DeadLetterPublishingRecoverer(kafkaTemplate, CUSTOMIZE_DESTINATION_RESOLVER);
}}
这是我保存消息的消费者
@Component
@RequiredArgsConstructor
public class SmsConsumer {
private final TransactionEventRepo transactionEventRepo;
@KafkaListener(groupId = "group-1", topics = ProducerConfiguration.TOPIC_NAME)
public void consumeSms(@Payload TransactionEvent transactionEvent) {
transactionEventRepo.save(transactionEvent);
}
}
一切正常,你看我使用了新的FixedBackOff(2000,5),但我希望kafka重试1分钟,没有间隔,如果发生异常,则重试最大尝试次数。如果1分钟后仍然发生异常,它会将消息丢弃到DLT。
使用
ExponentialBackOff
与...
/**
* The maximum elapsed time in milliseconds after which a call to
* {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}.
*/
public void setMaxElapsedTime(long maxElapsedTime) {