SpringBoot KafkaConsumer从一开始就没有消费

问题描述 投票:0回答:1

我有一个 spring boot kafka 消费者,它为特定消费者组 ID 配置了

auto.offset.reset=earliest
。我希望每当应用程序重新启动时消费者就从头开始。我不认为这种情况会发生。当我更改偏移量时,它从头开始消耗,当我重新启动应用程序时,它又从最新开始消耗。为什么 spring 会覆盖我的
auto.offset.reset=earliest
设置?

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBrokers());
    config.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getKafkaInputTopic() + "_id");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put("ssl.truststore.location", configuration.getKafkaSslTrustStore());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put("security.protocol", "SASL_SSL");
    config.put("sasl.mechanism", "SCRAM-SHA-512");
@Service
@Getter
@Slf4j
public class KafkaConsumerService {

  private final Map<String, Set<String>> fleetVinsMap = new HashMap<>();

  @KafkaListener(topics = "${app.conf.source.kafka.topic}",
      containerFactory = "heartbeatKafkaListenerContainerFactory")
  public void consume(String message) {
  ...
  }

上面是我的consumerFactory和消费者服务。我还需要设置什么吗?

java spring-boot spring-kafka
1个回答
0
投票

正如 @Šimon 所介绍的,ConsumerGroup 中消费者的偏移量由 Kafka Broker 管理,因此能够适应应用程序刷新。这通常是消费消息的首选方式,因为我们不想两次读取消息。

如果您想无论如何使用相同的消息(例如在开发环境中),请启动一个新的消费者组并设置属性:

config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

这将关闭消费者的自动提交策略,并要求您使用 commitAsync() 或 commitSync() 手动提交消息偏移量。如果未进行提交,则 Kafka Broker 中不会存储任何偏移量。

© www.soinside.com 2019 - 2024. All rights reserved.