Kafka:消费者api:无法使用kafka-consumer-api从偏移量手动读取和确认

问题描述 投票:0回答:1

我的用例是使用kafka Consumer-api,以便我们可以从kafka主题中最后成功处理的数据的偏移量中手动读取,然后手动向Kafka确认成功处理的数据。 (这是为了减少数据丢失)。但是,在我当前的实现中,即使我注释掉'ack.acknowledge()',该程序也会向前移动并从下一个偏移量读取。我是Kafka的新手,并且通过以下方式实现了我的消费者(我们正在使用Spring Boot)

问题是:即使我注释掉ack.acknowledge(),偏移量仍在更新,并且使用者正在从下一个偏移量读取数据,这是意外的(到目前为止,据我所知)]

Consumer Config [请注意,我将ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG设置为false并设置了factory.getContainerProperties()。setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private AdapterStreamProperties appProperties;

    @Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
    private String receiveBufferBytes;

    @Bean
    public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "adapter");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);

        sslConfigs(props);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private void sslConfigs(Map<String, Object> props) {
        if (appProperties.isSslEnabled()) {
            // configure the following three settings for SSL Authentication
            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, appProperties.getSslKeyLocation());
            props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, appProperties.getSslKeyPswd());
            props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, appProperties.getSslKeyPswd());

            // configure the following three settings for SSL Encryption
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
            props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProperties.getSslTrustLocation());
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, appProperties.getSslTrustPswd());
        }
    }
}

然后我的消费者正在以这种方式使用[即使我注释掉'ack.acknowledge()',下一次它仍将从下一个偏移量读取:

  @KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
  public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                     @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {

    System.out.println("----------------------------------------------------------------------------");
    System.out.println("Reading from Offset: " +  offset + ", and Partition: " + partition);
    System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " +   record.value());
    System.out.println("----------------------------------------------------------------------------");
    NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());

    if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
      kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
    }
    else {
      kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
    }
    ack.acknowledge();
  }

我的gradle.build中的kafka api版本:

//Kafka Dependencie
implementation      'org.apache.kafka:kafka-streams:2.0.1'
implementation      'org.apache.kafka:kafka-clients:2.0.1'

任何见识都会有所帮助。

提前感谢

apache-kafka kafka-consumer-api
1个回答
0
投票
ack.acknowledge()
© www.soinside.com 2019 - 2024. All rights reserved.