当自动提交设置为 false 时,Kafka 监听器开始消费消息时如何重新传递消息

问题描述 投票:0回答:1

假设我启动消费者时将自动提交设置为 false,并且消费者开始监听消息。

  1. 我的监听器处理了 100 条轮询消息中的 50 条消息,然后只有 50 条已提交,其余 51 到 100 条未提交。
  2. 我开始处理来自 101 的下一组消息并提交到 200。
  3. 一段时间后,我的消费者会收到从 51 到 100 的消息,因为我没有提交它们吗?

我的 Kafka Consumer 将如何处理此用例?

我尝试过以下方法。批量轮询消息并根据我的处理结果提交这批记录。

@KafkaListener(id="${listenerID}",topics = "${consumer.topic}", containerFactory = "listenerContainerFactory",autoStartup ="${isListenerEnabled}")
public void messageListener(List<ConsumerRecord<String, String>> list,Consumer<String, String> consumer) {
 try {
        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap=processMessage(list, acknowledgment,consumer); //Here I've logic which validates batch of records and gives result for all the processed records. 
        consumer.commitSync(offsetAndMetadataMap);
    }
    catch (Exception e){
               }
}



public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory()
 {
 ConcurrentKafkaListenerContainerFactory<String, String> factory = new  ConcurrentKafkaListenerContainerFactory<>();

      factory.setConsumerFactory(consumerFactory());
      factory.setBatchListener(true); 
      factory.setAutoStartup(false); 
      factory.setConcurrency(consumerProperties.getConsumerThreads()); 
      factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000,2)));       
     
    if(!consumerProperties.isAutoCommit()) { 
      factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
      }
 return factory; 
}


public ConsumerFactory<String, String> consumerFactory() { 
Map<String, Object> properties = new HashMap<>();
 try {
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStarpServer);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
      
        return new DefaultKafkaConsumerFactory<>(properties);
    }
java spring-boot apache-kafka spring-kafka kafka-consumer-api
1个回答
0
投票

不,你的消费者不会收到从偏移量 51 到 100 的消息,因为之前你已经提交了高于 100 或 51 的偏移量 200。 当您向 kafka 提交一些偏移量时,Kafa 会将此偏移量存储在特定的消费者组元数据中。将来,如果某个消费者连接到该消费者组,那么 kafka 将开始从之前存储的偏移量发送消息。

© www.soinside.com 2019 - 2024. All rights reserved.