我正在使用 AMQP-CPP 库(不是 amqp C 库)连接到 RabbitMQ。消费队列是有效的,发布也是如此。我不使用交换器或任何东西,只是一个队列上的一个消费者。当尝试取消队列时,在执行 DeferredCancel::onSuccess 回调后我仍然收到消息。另外,回调(std::string Consumer)为空,这应该又是consumerTag吗?
这是我观察到的:
// publish many messages to "queueName"
m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});
m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});
输出:
message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until all messages have been delivered
我希望消息在打印输出“应该停止消耗”后停止。
原来在发送cancel()请求时消费请求还没有被处理。即使没有消费者被取消,RabbitMQ/AMQP-CPP 库也会响应“成功”,因为 RabbitMQ 端尚不存在消费者。然后处理 Consumer(),这就是我看到上述行为的原因。
我通过将所有内容包装在回调中来修复它。我正在维护自己的 DeferredQueue 和 DeferredConsumer 列表,并存储 onSuccess 回调是否已执行(因为 AMQP-CPP 中似乎没有“待处理”等效项)。
如果 onSuccess 回调尚未执行,我会覆盖 onSuccess 回调,如果已经执行,我可以正常取消。
// publish many messages to "queueName"
bool onSuccessExecuted = false;
auto& deferredConsumer = m_channel->consume("queueName", "hardcodedConsumerTag");
deferredConsumer.onReceived([](AMQP::Message m){
std::cout << m.body()<< std::endl;
});
deferredConsumer.onSuccess([&](){
onSuccessExecuted=true;
// do stuff you want to do when starting consuming a queue
});
if (onSuccessExecuted == false){
// this overwrites the previous onSuccess callback
deferredConsumer.onSuccess([this](){
cancel();
// must still be set if we might want to cancel again later
onSuccessExecuted=true;
}
} else {
// if onSuccess has already been executed we just cancel normally,
// as the onSuccess callback won't be executed again
cancel();
}
void cancel() {
m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){
std::cout << "should have stopped consuming for: " << consumer << std::endl
});
}