我的 rpc 请求场景中有一个奇怪的案例。由于 autodelete 的属性设置为 true,我发现了它。 这个属性表示当最后一个消费者取消订阅时,已经有至少一个消费者的队列被删除。
现在,我注意到有时客户端使用 remoteCall 方法(在代码下方)向服务器发送请求,服务器给出响应,将其发送到客户端在 remoteCall 中声明的正确回复队列(自动删除为真) ,但是这个队列上的消费者已经关闭,而且这个队列永远不会被 rabbitMQ 删除。
我不明白如果在 remoteCall 中我执行 basicPublish 并且在我开始等待消费者的响应之后,这怎么可能。 这意味着客户端应该立即附加到回复队列,但显然有时情况并非如此,否则队列将被删除。
那么,我怎么可能使用下面的 remoteCall 方法得到从未有过消费者的回复队列?
我希望有人能帮助我。谢谢大家
public Message remoteCall(Message requestMessage) {
try {
MessagePayload messagePayload = requestMessage.getPayload();
String uid = UUID.randomUUID().toString();
long timeMillis = System.currentTimeMillis();
String customReplyName = outputQueueName + "_" + messagePayload.getOperation() + "_" + uid + "_" + timeMillis;
String replyQueueName = this.channel.queueDeclare(
customReplyName,
false,
true,
true,
null)
.getQueue();
BasicProperties remoteCallProperties = this.getRemoteCallProperties(requestMessage.getProperties(), replyQueueName);
this.channel.basicPublish(
"",
this.outputQueueName,
remoteCallProperties,
MessageUtils.payloadToByteArray(requestMessage.getPayload())
);
// Wait until response
final BlockingQueue<MessagePayload> blockingQueue = new ArrayBlockingQueue<>(1);
String tag = this.channel.basicConsume(
replyQueueName,
true,
(consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(remoteCallProperties.getCorrelationId())) {
blockingQueue.offer(Objects.requireNonNull(MessageUtils.byteArrayToPayload(delivery.getBody())));
}
},
consumerTag -> {
});
MessagePayload responsePayload = blockingQueue.take();
this.channel.basicCancel(tag);
MessageProperties responseProperties = new MessagePropertiesBuilder()
.correlationId(remoteCallProperties.getCorrelationId())
.build();
return new MessageBuilder()
.properties(responseProperties)
.payload(responsePayload)
.build();
} catch (IOException | InterruptedException e) {
Logging.getLogger().error("MessageBrokerError: ");
Logging.getLogger().error(e.getMessage());
return null;
}
}
我希望在客户端的基本发布之后 remoteCall 方法到达 basicConsume 行时,消费者总是链接到回复队列。