如何配置kafka在没有maxattempts的情况下1分钟后重试停止?

问题描述 投票:0回答:1

我正在使用 Spring Kafka 消费者,它从主题中获取消息并将其保存到数据库中。如果满足失败条件,例如数据库不可用,kafka消费者库是否提供重试机制?如果确实如此,有没有办法在将消息删除到 dlt 后的 1 分钟内设置重试(不想使用间隔时间和 maxAttempt)

这是我的消费者配置

@Configuration 
@RequiredArgsConstructor
public class ConsumerConfiguration {
private final KafkaTemplate <String,TransactionEvent> kafkaTemplate;

public Map<String, Object> consumerConfig() {
    Map<String, Object> consumerConfig = new HashMap<>();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
    return consumerConfig;
}

@Bean
public ConsumerFactory<String, TransactionEvent> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(),
            new ErrorHandlingDeserializer<>(new JsonDeserializer<>(TransactionEvent.class)));
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TransactionEvent>> kafkaListenerContainerFactory(
        ConsumerFactory<String, TransactionEvent> consumerFactory, DefaultErrorHandler errorHandler
) {
    ConcurrentKafkaListenerContainerFactory<String, TransactionEvent> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    listenerContainerFactory.setConsumerFactory(consumerFactory);
    listenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    listenerContainerFactory.setCommonErrorHandler(errorHandler);
    return listenerContainerFactory;
}

@Bean
public DefaultErrorHandler errorHandler() {
    return new DefaultErrorHandler(recover(), new FixedBackOff(2000,5));
}

@Bean
public DeadLetterPublishingRecoverer recover() {
    final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
            CUSTOMIZE_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic().concat("DLT"),cr.partition());

    return new
            DeadLetterPublishingRecoverer(kafkaTemplate, CUSTOMIZE_DESTINATION_RESOLVER);
}}

这是我保存消息的消费者

@Component
@RequiredArgsConstructor
public class SmsConsumer {

   private final TransactionEventRepo transactionEventRepo;

    @KafkaListener(groupId = "group-1", topics = ProducerConfiguration.TOPIC_NAME)
    public void consumeSms(@Payload TransactionEvent transactionEvent) {

        transactionEventRepo.save(transactionEvent);
    }

}

一切正常,你看我使用了新的FixedBackOff(2000,5),但我希望kafka重试1分钟,没有间隔,如果发生异常,则重试最大尝试次数。如果1分钟后仍然发生异常,它会将消息丢弃到DLT。

spring-boot apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

使用

ExponentialBackOff
与...

/**
 * The maximum elapsed time in milliseconds after which a call to
 * {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}.
 */
public void setMaxElapsedTime(long maxElapsedTime) {
© www.soinside.com 2019 - 2024. All rights reserved.