Apache Camel Kafka 消费者即使在暂停后也会消费消息

问题描述 投票:0回答:1

我在

Kafka consumer
路由中有一个
Camel
,它消耗如下消息:

from("kafka:MykafkaTopic?brokers=localhost:9092&autoCommitEnable=false&allowManualCommit=true&maxPollRecords=1")
        .pausable(new KafkaConsumerListener(), o -> canContinue())
        .routeId("consumer")
            .log("    on the topic ${headers[kafka.TOPIC]}")
            .log("    on the partition ${headers[kafka.PARTITION]}")
            .log("    with the offset ${headers[kafka.OFFSET]}")
           // .to("direct:controlRoute")
        .to("direct:IncomingKafka")
        .end();

此方法必须处理暂停流程:

public  boolean canContinue() {
        if(count < 4) {
            java.util.logging.Logger.getGlobal().info("count in canContinue::"+count);
            count++;
            return true;
        }
        if(count > 10) {
            java.util.logging.Logger.getGlobal().info("count in canContinue::"+count);
            return true;
        } else {
            count++;
            java.util.logging.Logger.getGlobal().info("count in canContinue::"+count);
            return false;
        }
}


根据代码,当计数为

greater than 4
less than 10
时,消息消费预计会停止。然而,
Kafka consumer
继续消费消息并丢弃它们,表明在 4 到 10 之间有一个暂停。我通过观察偏移量不断变化而得出这个结论,无论消费者的状态如何。

我期望消费者在暂停时停止消费消息,并从暂停的地方恢复消费。

spring-boot apache-kafka apache-camel
1个回答
0
投票

可暂停 EIP 旨在当“路由中出现”异常导致交换无法处理时暂停消费者。更具体地说,pausable

 EIP 调用的检查应用于测试阻止处理交换的瞬态条件。

因此,似乎缺少的是在

direct:IncomingKafka

 路由中某处抛出的异常,导致交换无法处理。

基于此,我实际上

调整了文档,使得可暂停EIP的目的更加明确。

© www.soinside.com 2019 - 2024. All rights reserved.