RabbitMQ 消费者从未附加到队列,响应到达并且队列从未被删除

问题描述 投票:0回答:0

我的 rpc 请求场景中有一个奇怪的案例。由于 autodelete 的属性设置为 true,我发现了它。 这个属性表示当最后一个消费者取消订阅时,已经有至少一个消费者的队列被删除。

现在,我注意到有时客户端使用 remoteCall 方法(在代码下方)向服务器发送请求,服务器给出响应,将其发送到客户端在 remoteCall 中声明的正确回复队列(自动删除为真) ,但是这个队列上的消费者已经关闭,而且这个队列永远不会被 rabbitMQ 删除。

我不明白如果在 remoteCall 中我执行 basicPublish 并且在我开始等待消费者的响应之后,这怎么可能。 这意味着客户端应该立即附加到回复队列,但显然有时情况并非如此,否则队列将被删除。

那么,我怎么可能使用下面的 remoteCall 方法得到从未有过消费者的回复队列?

我希望有人能帮助我。谢谢大家

public Message remoteCall(Message requestMessage) {
        
        try {
            MessagePayload messagePayload = requestMessage.getPayload();
            String uid = UUID.randomUUID().toString();
            long timeMillis = System.currentTimeMillis();
            String customReplyName = outputQueueName + "_" + messagePayload.getOperation() + "_" + uid + "_" + timeMillis;

            String replyQueueName = this.channel.queueDeclare(
                            customReplyName,
                            false,
                            true,
                            true,
                            null)
                    .getQueue();


            BasicProperties remoteCallProperties = this.getRemoteCallProperties(requestMessage.getProperties(), replyQueueName);

            this.channel.basicPublish(
                    "",
                    this.outputQueueName,
                    remoteCallProperties,
                    MessageUtils.payloadToByteArray(requestMessage.getPayload())
            );

            // Wait until response
            final BlockingQueue<MessagePayload> blockingQueue = new ArrayBlockingQueue<>(1);

            String tag = this.channel.basicConsume(
                    replyQueueName,
                    true,
                    (consumerTag, delivery) -> {
                        if (delivery.getProperties().getCorrelationId().equals(remoteCallProperties.getCorrelationId())) {
                            blockingQueue.offer(Objects.requireNonNull(MessageUtils.byteArrayToPayload(delivery.getBody())));
                        }
                    },
                    consumerTag -> {
                    });

            MessagePayload responsePayload = blockingQueue.take();

            this.channel.basicCancel(tag);

            MessageProperties responseProperties = new MessagePropertiesBuilder()
                    .correlationId(remoteCallProperties.getCorrelationId())
                    .build();

            return new MessageBuilder()
                    .properties(responseProperties)
                    .payload(responsePayload)
                    .build();

        } catch (IOException | InterruptedException e) {
            Logging.getLogger().error("MessageBrokerError: ");
            Logging.getLogger().error(e.getMessage());
            return null;
        }
    }

我希望在客户端的基本发布之后 remoteCall 方法到达 basicConsume 行时,消费者总是链接到回复队列。

rabbitmq rpc producer-consumer
© www.soinside.com 2019 - 2024. All rights reserved.