我有一个 spring boot kafka 消费者,它为特定消费者组 ID 配置了
auto.offset.reset=earliest
。我希望每当应用程序重新启动时消费者就从头开始。我不认为这种情况会发生。当我更改偏移量时,它从头开始消耗,当我重新启动应用程序时,它又从最新开始消耗。为什么 spring 会覆盖我的 auto.offset.reset=earliest
设置?
@Bean
public ConsumerFactory<String, String> consumerFactory() {
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBrokers());
config.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getKafkaInputTopic() + "_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put("ssl.truststore.location", configuration.getKafkaSslTrustStore());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "SCRAM-SHA-512");
@Service
@Getter
@Slf4j
public class KafkaConsumerService {
private final Map<String, Set<String>> fleetVinsMap = new HashMap<>();
@KafkaListener(topics = "${app.conf.source.kafka.topic}",
containerFactory = "heartbeatKafkaListenerContainerFactory")
public void consume(String message) {
...
}
上面是我的consumerFactory和消费者服务。我还需要设置什么吗?
正如 @Šimon 所介绍的,ConsumerGroup 中消费者的偏移量由 Kafka Broker 管理,因此能够适应应用程序刷新。这通常是消费消息的首选方式,因为我们不想两次读取消息。
如果您想无论如何使用相同的消息(例如在开发环境中),请启动一个新的消费者组并设置属性:
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
这将关闭消费者的自动提交策略,并要求您使用 commitAsync() 或 commitSync() 手动提交消息偏移量。如果未进行提交,则 Kafka Broker 中不会存储任何偏移量。