我的用例是使用kafka Consumer-api,以便我们可以从kafka主题中最后成功处理的数据的偏移量中手动读取,然后手动向Kafka确认成功处理的数据。 (这是为了减少数据丢失)。但是,在我当前的实现中,即使我注释掉'ack.acknowledge()',该程序也会向前移动并从下一个偏移量读取。我是Kafka的新手,并且通过以下方式实现了我的消费者(我们正在使用Spring Boot)
问题是:即使我注释掉ack.acknowledge(),偏移量仍在更新,并且使用者正在从下一个偏移量读取数据,这是意外的(到目前为止,据我所知)]
Consumer Config [请注意,我将ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG设置为false并设置了factory.getContainerProperties()。setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private AdapterStreamProperties appProperties;
@Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
private String receiveBufferBytes;
@Bean
public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"adapter");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);
sslConfigs(props);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private void sslConfigs(Map<String, Object> props) {
if (appProperties.isSslEnabled()) {
// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, appProperties.getSslKeyLocation());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, appProperties.getSslKeyPswd());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, appProperties.getSslKeyPswd());
// configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProperties.getSslTrustLocation());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, appProperties.getSslTrustPswd());
}
}
}
然后我的消费者正在以这种方式使用[即使我注释掉'ack.acknowledge()',下一次它仍将从下一个偏移量读取:
@KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {
System.out.println("----------------------------------------------------------------------------");
System.out.println("Reading from Offset: " + offset + ", and Partition: " + partition);
System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " + record.value());
System.out.println("----------------------------------------------------------------------------");
NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());
if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
}
else {
kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
}
ack.acknowledge();
}
我的gradle.build中的kafka api版本:
//Kafka Dependencie
implementation 'org.apache.kafka:kafka-streams:2.0.1'
implementation 'org.apache.kafka:kafka-clients:2.0.1'
任何见识都会有所帮助。
提前感谢
ack.acknowledge()