Spring Kafka Retry会在不同分区中选择消息吗?

问题描述 投票:0回答:1

我想知道Spring Kafka如何在分配给实例的多个分区的情况下处理重试。 Spring Kafka是否根据重试策略和退避策略继续重试相同的消息,或者是否重试,并且在重试之间,它是否从其他分区发送消息?

是行为:

A)重试消息 - >重试消息 - >重试消息

B)重试消息 - >其他消息 - >重试消息 - >重试消息

我看过其他stackoverflow问题似乎证实给定一个分区Spring Kafka不会移动到另一个偏移量,但是如果有多个分区分配给该实例,则没有关于行为的信息。我已经实现了一个具有重试模板和有状态重试的工厂。

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        ListenerExceptions listenerExceptions = new ListenerExceptions();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(KafkaProperties.CONCURRENCY);
        factory.getContainerProperties().setPollTimeout(KafkaProperties.POLL_TIMEOUT_VLAUE);
        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(new SeekToCurrentErrorHandler());
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback((RetryContext context) -> listenerExceptions.recover(context));
        return factory;
    }
spring-kafka spring-retry
1个回答
1
投票

来自上述工厂的重试配置被委托给RetryingMessageListenerAdapter,其逻辑如下:

public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
        final Consumer<?, ?> consumer) {
    RetryState retryState = null;
    if (this.stateful) {
        retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
    }
    getRetryTemplate().execute(context -> {
                context.setAttribute(CONTEXT_RECORD, record);
                switch (RetryingMessageListenerAdapter.this.delegateType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE:
                        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
                        context.setAttribute(CONTEXT_CONSUMER, consumer);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
                        break;
                    case ACKNOWLEDGING:
                        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
                        break;
                    case CONSUMER_AWARE:
                        context.setAttribute(CONTEXT_CONSUMER, consumer);
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
                        break;
                    case SIMPLE:
                        RetryingMessageListenerAdapter.this.delegate.onMessage(record);
                }
                return null;
            },
            getRecoveryCallback(), retryState);
}

所以,我们会根据消息重试。根据Apache Kafka的建议,我们在一个线程中处理一个分区,因此在重试耗尽或调用成功之前,不会处理该分区中的每个下一个记录。

根据您的多个分区条件和factory.setConcurrency(KafkaProperties.CONCURRENCY);配置,可能是在不同的线程中处理不同的分区。因此,可能是同时重试来自不同分区的不同记录的情况。仅仅因为重试与线程和调用堆栈相关联。

© www.soinside.com 2019 - 2024. All rights reserved.