我目前正在使用 Kafka,更具体地说,能够使用日志压缩功能,如下所述:https://kafka.apache.org/documentation/#compaction.
我正在使用 kafka-clients java 库:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
然后,我使用这个属性在压缩模式下创建一个主题:
"cleanup.policy"="compact"
"delete.retention.ms"="100"
"segment.ms"="100"
"min.cleanable.dirty.ratio"="0.01"
出于测试目的,代理将属性 log.retention.check.interval.ms 设置为 1 秒,以便每秒压缩数据。
然后,我按照特定顺序生成 3 批不同的消息:
#1
"k1":"V1"
"k3":"V1"
"k2":"V1"
"k1":"V2"
#2
"k2":"V2"
"k1":"V3"
"k1":"V4"
#3
"k4":"V1"
我在每批消息之间等待至少 3 秒(等待日志清理器)。当我使用控制台消费者时,我得到了这个结果:
"k3":"V1"
"k2":"V2"
"k1":"V4"
"k4":"V1"
这正是我所期望的。但是,当我使用 Java 库中的
KafkaConsumer
时,我得到了这个结果:
"k3":"V1"
"k2":"V2"
"k1":"V4"
k4 不见了。我的理解是 k4 被推入了一个叫做头部的特定部分,只有尾部被压缩了。 link 中很好地解释了 Kafka 使用他的尾巴和头部的方式。在我的例子中,消费者似乎从未完全阅读过 kafka 主题:它排除了仍在“脏”部分(头部)中的任何数据。所以我的问题很简单:为什么
KafkaConsumer
排除 head 段中的数据?我是否缺少消费者属性中的特定配置?
Java 片段以防万一:
@SneakyThrows
public static void main(String[] args) {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties());) {
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : poll) {
log.info("Key : {} with value : {}", consumerRecord.key(), consumerRecord.value());
}
}
}
private static Properties getConsumerProperties() {
Properties result = new Properties();
result.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
result.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
result.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
result.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return result;
}