如何指定使用Spring Cloud Stream向RabbitMQ发送消息的超时时间?

问题描述 投票:0回答:1

我们在发送消息的过程中遇到了网络问题,这导致所有线程处于“阻塞”状态。我们正在使用org.springframework.cloud:spring-cloud-stream:2.0.1.RELEASEorg.springframework:spring-messaging:5.0.8.RELEASE将消息发送到RabbitMQ代理。绑定界面:

interface MessagingSource {
    @Output("bindingTargetName")
    fun messageChannelOutput(): MessageChannel
}

用法:

 val isSent = messageSource.messageChannelOutput().send(message)

MessageChannel#send(Message,long)方法也以毫秒为单位作为第二个参数,但是在org.springframework.integration.channel.AbstractSubscribableChannel#doSend方法中它进一步被忽略:

@Override
protected boolean doSend(Message<?> message, long timeout) { // timeout is ignored in this method
    try {
        return getRequiredDispatcher().dispatch(message);
    }
    catch (MessageDispatchingException e) {
        String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
        throw new MessageDeliveryException(message, description, e);
    }
}

您能解释为什么忽略超时参数以及如何配置它以避免长时间阻塞状态吗?

谢谢!

java rabbitmq spring-integration amqp spring-cloud-stream
1个回答
0
投票

通道sendTimeout仅在通道本身可能会阻塞的情况下适用,例如一个QueueChannel,其队列当前已满;呼叫者将一直阻塞,直到队列中的空间可用或发生超时为止。

在这种情况下,该块位于通道的下游,因此sendTimeout是无关紧要的(无论如何,它是一个DirectChannel,无论如何都无法阻止,已订阅的处理程序将直接在调用线程上被调用)。

您看到的实际阻塞很可能出现在rabbitmq客户端的socket.write()中,该客户端没有超时且不可中断;调用线程无法“暂停”写入操作。

我知道的唯一可能的解决方法是通过在连接工厂上调用resetConnection()来强制关闭Rabbit连接。

© www.soinside.com 2019 - 2024. All rights reserved.