我在
Kafka consumer
路由中有一个 Camel
,它消耗如下消息:
from("kafka:MykafkaTopic?brokers=localhost:9092&autoCommitEnable=false&allowManualCommit=true&maxPollRecords=1")
.pausable(new KafkaConsumerListener(), o -> canContinue())
.routeId("consumer")
.log(" on the topic ${headers[kafka.TOPIC]}")
.log(" on the partition ${headers[kafka.PARTITION]}")
.log(" with the offset ${headers[kafka.OFFSET]}")
// .to("direct:controlRoute")
.to("direct:IncomingKafka")
.end();
此方法必须处理暂停流程:
public boolean canContinue() {
if(count < 4) {
java.util.logging.Logger.getGlobal().info("count in canContinue::"+count);
count++;
return true;
}
if(count > 10) {
java.util.logging.Logger.getGlobal().info("count in canContinue::"+count);
return true;
} else {
count++;
java.util.logging.Logger.getGlobal().info("count in canContinue::"+count);
return false;
}
}
根据代码,当计数为
greater than 4
和 less than 10
时,消息消费预计会停止。然而,Kafka consumer
继续消费消息并丢弃它们,表明在 4 到 10 之间有一个暂停。我通过观察偏移量不断变化而得出这个结论,无论消费者的状态如何。
我期望消费者在暂停时停止消费消息,并从暂停的地方恢复消费。
可暂停 EIP 旨在当“路由中出现”异常导致交换无法处理时暂停消费者。更具体地说,pausable
EIP 调用的检查应用于测试阻止处理交换的瞬态条件。因此,似乎缺少的是在
direct:IncomingKafka
路由中某处抛出的异常,导致交换无法处理。基于此,我实际上
调整了文档,使得可暂停EIP的目的更加明确。