如何重启SimpleMessageListenerContainer

问题描述 投票:0回答:2

我尝试使用 Spring Boot 连接到 RabbitMQ。连接应始终重新启动/重试连接。我在发生致命异常后重新连接时遇到问题。应用程序永远不会失去连接,也不会无限期地重试获取连接。

 @Bean
    public IntegrationFlow flow() {
        final SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(getConnectionFactory());
        listenerContainer.setQueues(getQueue());
        listenerContainer.setDeclarationRetries(Integer.MAX_VALUE);
        final AmqpInboundChannelAdapterSpec adapter = (AmqpInboundChannelAdapterSpec) Amqp.inboundAdapter(listenerContainer);
        return IntegrationFlows
                .from(adapter)
                .filter(filter)
                .transform(transformer)
                .handle(processor)
                .get();
    }

我可能会遇到

fatal
异常,我在
ApplicationListener<ListenerContainerConsumerFailedEvent>
中捕获了该异常。这个异常并不总是相同的,上次是'PossibleAuthenticationFailureException'

如果异常是致命的,我会停止然后启动容器。我意识到这可能是错误的地方,因为看起来容器在调用事件后停止了。

@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
    if (event.isFatal()) {
      SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer) event.getSource()
      simpleMessageListenerContainer.stop();
      simpleMessageListenerContainer.start();
      LOG.info("started container");
    }
}

导致以下输出并且与兔子没有连接。 (以下输出后没有任何内容,它只是不执行任何操作未连接)

[rContainer#0-34] startConnectionOnFatalConnectionListener :启动容器。

[rContainer#0-34] o.s.a.r.l.SimpleMessageListenerContainer:停止来自中止消费者的容器

版本:

  • RabbitMQ 3.6.8,Erlang 19.2
  • Spring 集成(spring-integration-amqp):4.3.9

我能够获取更多日志条目,在调用 ListenerContainerConsumerFailedEvent 之前记录以下几行:

07:05.843 ERROR o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
07:05.843 ERROR o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
07:15.087 ERROR o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception on startup

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:476) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1280) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:309) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:547) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:472) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    ... 2 common frames omitted
Caused by: com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:342) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:813) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:767) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:887) ~[amqp-client-3.6.3.jar!/:na]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:300) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    ... 7 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:234) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:212) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:327) ~[amqp-client-3.6.3.jar!/:na]
    ... 11 common frames omitted
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.8.0_92]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.6.3.jar!/:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:542) ~[amqp-client-3.6.3.jar!/:na]
    ... 1 common frames omitted

解决方法:实施健康检查。健康检查检查与rabbitMQ的连接是否存在。如果运行状况检查未成功,应用程序将重新启动。 (作为 cron 实现,它调用 bash 脚本。bash 脚本停止并启动容器)。

java spring-boot rabbitmq spring-integration spring-rabbit
2个回答
0
投票

您可以尝试使用 DefaultMessageListenerContainer,根据文档,

[...] 在代理暂时不可用的情况下完全自我恢复,并允许停止/重新启动以及运行时更改其配置。


0
投票

这里您需要考虑两件事。首先是事件侦听器迟到的事实。您可能希望将调用代码封装在错误处理程序中。由于您遇到了不同的错误,并且您只想捕获所有错误并执行相同的操作,因此您需要执行通用错误处理程序。一个能捕捉一切并以同样的方式处理它的人。您有特定的处理程序。这是通用错误处理程序的示例。

try
{
     //statements that may cause an exception
}
catch (exception(e))‏
{
     //error handling code
}

祝你好运,希望对你有用。

© www.soinside.com 2019 - 2024. All rights reserved.