当消息不能被RabbitMQ路由时,Publisher确认会给出ACK。

问题描述 投票:0回答:1

我正在做一个小项目,检查RabbitMQ的Publisher Confirms与Spring Cloud Stream 3.0.1的工作情况。

我有一个特殊的案例,当一条消息被发送到RabbitMQ的交换器时,但该交换器无法路由这条消息。我以为Publisher会将此消息作为错误处理,但令我惊讶的是,此消息被作为ACK处理。

这是Publisher的代码。

@Slf4j
@Timed
@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {

        log.info("Message for Testing Publisher confirms sent: " + message);
        return messagingChannels.walletTest().send(message);
    }

    @ServiceActivator(inputChannel = TEST_ACK)
    public void acks(Message<?> ack) {
        log.info("Message ACK received for Test: " + ack);
    }

    @ServiceActivator(inputChannel = TEST_ERROR)
    public void errors(Message<?> error) {
        log.info("Message error for Test received: " + error);
    }
}

我在org.springframework.integration.handler.LoggingHandler的日志中看到这条消息。

org.springframework.integration.amqp.support.ReturnedAmqpMessageException, failedMessage=GenericMessage [payload=byte[1009], headers={id=c9bc65bc-9256-1ce2-324f-167cc526a819, timestamp=1591962425292}] [amqpMessage=(Body:'{"payload":{"id":"f696668d-140e-4ba7-96c7-ea6decdfcccd","version":20000,"currency":"USD"},"eventName":"TEST_UPDATED",......contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, .....]

正如你所看到的,它被报告为一个失败的消息, replyCode=312, replyText=NO_ROUTE。

然而,它是作为ACK而不是NACK接收的。我查了一下文档,我想这是启用Publisher Confirms后的预期行为,但我想确认一下。如果是这样的话,有没有什么方法可以在ACK的上下文中检测到这个消息不可能被绕过?因为在这个上下文中,我没有responseCode和responseText,我只有Publisher发送的正确消息。

Publisher确认是正确激活了接下来的属性。

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

ACK通道和错误通过errorChannelEnabled和confirmAckChannel属性正确配置。

rabbitmq spring-integration spring-cloud-stream spring-rabbitmq
1个回答
0
投票

工作原理是这样的,在返回的消息之后发送的是ack,而不是nack。

请看 文件.

对于不可路由的消息,一旦交易所验证消息不会路由到任何队列(返回一个空的队列列表),经纪人就会发出确认。如果消息也被发布为强制性的,basic.return会在basic.ack之前发送给客户端。

你无法判断是在ack的上下文中返回的。

如果你直接使用Spring AMQP而不是通过Spring Cloud Stream,你可以,因为ACK回调得到的是 CorrelationData 它将与返回的信息相联系。

© www.soinside.com 2019 - 2024. All rights reserved.