amqp-cpp 库:channel->cancel(consumerTag) 似乎没有取消

问题描述 投票:0回答:1

我正在使用 AMQP-CPP 库(不是 amqp C 库)连接到 RabbitMQ。消费队列是有效的,发布也是如此。我不使用交换器或任何东西,只是一个队列上的一个消费者。当尝试取消队列时,在执行 DeferredCancel::onSuccess 回调后我仍然收到消息。另外,回调(std::string Consumer)为空,这应该又是consumerTag吗?

这是我观察到的:

// publish many messages to "queueName"

m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});

m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});

输出:

message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until all messages have been delivered

我希望消息在打印输出“应该停止消耗”后停止。

c++ rabbitmq amqp
1个回答
0
投票

原来在发送cancel()请求时消费请求还没有被处理。即使没有消费者被取消,RabbitMQ/AMQP-CPP 库也会响应“成功”,因为 RabbitMQ 端尚不存在消费者。然后处理 Consumer(),这就是我看到上述行为的原因。

我通过将所有内容包装在回调中来修复它。我正在维护自己的 DeferredQueue 和 DeferredConsumer 列表,并存储 onSuccess 回调是否已执行(因为 AMQP-CPP 中似乎没有“待处理”等效项)。

如果 onSuccess 回调尚未执行,我会覆盖 onSuccess 回调,如果已经执行,我可以正常取消。

// publish many messages to "queueName"

bool onSuccessExecuted = false;

auto& deferredConsumer = m_channel->consume("queueName", "hardcodedConsumerTag");

deferredConsumer.onReceived([](AMQP::Message m){
      std::cout << m.body()<< std::endl;
    });

deferredConsumer.onSuccess([&](){
      onSuccessExecuted=true;
      // do stuff you want to do when starting consuming a queue
    });

if (onSuccessExecuted == false){
  // this overwrites the previous onSuccess callback
  deferredConsumer.onSuccess([this](){
      cancel();
      // must still be set if we might want to cancel again later
      onSuccessExecuted=true;
    }
} else {
  // if onSuccess has already been executed we just cancel normally,
  // as the onSuccess callback won't be executed again
  cancel();
}

void cancel() {
  m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){
        std::cout << "should have stopped consuming for: " << consumer << std::endl
      });
}
© www.soinside.com 2019 - 2024. All rights reserved.