我正在做一个小项目,检查RabbitMQ的Publisher Confirms与Spring Cloud Stream 3.0.1的工作情况。
我有一个特殊的案例,当一条消息被发送到RabbitMQ的交换器时,但该交换器无法路由这条消息。我以为Publisher会将此消息作为错误处理,但令我惊讶的是,此消息被作为ACK处理。
这是Publisher的代码。
@Slf4j
@Timed
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
log.info("Message for Testing Publisher confirms sent: " + message);
return messagingChannels.walletTest().send(message);
}
@ServiceActivator(inputChannel = TEST_ACK)
public void acks(Message<?> ack) {
log.info("Message ACK received for Test: " + ack);
}
@ServiceActivator(inputChannel = TEST_ERROR)
public void errors(Message<?> error) {
log.info("Message error for Test received: " + error);
}
}
我在org.springframework.integration.handler.LoggingHandler的日志中看到这条消息。
org.springframework.integration.amqp.support.ReturnedAmqpMessageException, failedMessage=GenericMessage [payload=byte[1009], headers={id=c9bc65bc-9256-1ce2-324f-167cc526a819, timestamp=1591962425292}] [amqpMessage=(Body:'{"payload":{"id":"f696668d-140e-4ba7-96c7-ea6decdfcccd","version":20000,"currency":"USD"},"eventName":"TEST_UPDATED",......contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, .....]
正如你所看到的,它被报告为一个失败的消息, replyCode=312, replyText=NO_ROUTE。
然而,它是作为ACK而不是NACK接收的。我查了一下文档,我想这是启用Publisher Confirms后的预期行为,但我想确认一下。如果是这样的话,有没有什么方法可以在ACK的上下文中检测到这个消息不可能被绕过?因为在这个上下文中,我没有responseCode和responseText,我只有Publisher发送的正确消息。
Publisher确认是正确激活了接下来的属性。
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
ACK通道和错误通过errorChannelEnabled和confirmAckChannel属性正确配置。
工作原理是这样的,在返回的消息之后发送的是ack,而不是nack。
请看 文件.
对于不可路由的消息,一旦交易所验证消息不会路由到任何队列(返回一个空的队列列表),经纪人就会发出确认。如果消息也被发布为强制性的,basic.return会在basic.ack之前发送给客户端。
你无法判断是在ack的上下文中返回的。
如果你直接使用Spring AMQP而不是通过Spring Cloud Stream,你可以,因为ACK回调得到的是 CorrelationData
它将与返回的信息相联系。