Kafka Consumer 不返回 head 段中的记录

问题描述 投票:0回答:0

我目前正在使用 Kafka,更具体地说,能够使用日志压缩功能,如下所述:https://kafka.apache.org/documentation/#compaction.

我正在使用 kafka-clients java 库:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>

然后,我使用这个属性在压缩模式下创建一个主题:

"cleanup.policy"="compact"
"delete.retention.ms"="100"
"segment.ms"="100"
"min.cleanable.dirty.ratio"="0.01"

出于测试目的,代理将属性 log.retention.check.interval.ms 设置为 1 秒,以便每秒压缩数据。

然后,我按照特定顺序生成 3 批不同的消息:

#1
"k1":"V1"
"k3":"V1"
"k2":"V1"
"k1":"V2"
#2 
"k2":"V2"
"k1":"V3"
"k1":"V4"
#3
"k4":"V1"

我在每批消息之间等待至少 3 秒(等待日志清理器)。当我使用控制台消费者时,我得到了这个结果:

"k3":"V1"
"k2":"V2"
"k1":"V4"
"k4":"V1"

这正是我所期望的。但是,当我使用 Java 库中的

KafkaConsumer
时,我得到了这个结果:

"k3":"V1"
"k2":"V2"
"k1":"V4"

k4 不见了。我的理解是 k4 被推入了一个叫做头部的特定部分,只有尾部被压缩了。 link 中很好地解释了 Kafka 使用他的尾巴和头部的方式。在我的例子中,消费者似乎从未完全阅读过 kafka 主题:它排除了仍在“脏”部分(头部)中的任何数据。所以我的问题很简单:为什么

KafkaConsumer
排除 head 段中的数据?我是否缺少消费者属性中的特定配置?

Java 片段以防万一:

@SneakyThrows
public static void main(String[] args) {
    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties());) {
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : poll) {
            log.info("Key : {} with value : {}", consumerRecord.key(), consumerRecord.value());
        }
    }

}

private static Properties getConsumerProperties() {
    Properties result = new Properties();
    result.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    result.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    result.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    result.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return result;
}
java apache-kafka kafka-consumer-api
© www.soinside.com 2019 - 2024. All rights reserved.