Azure 服务总线:Amqp 空闲超时条件 = amqp:link:detach-forced

问题描述 投票:0回答:3

我得到的错误:

2019-12-09 06:39:33.189 ERROR 107132 --- [http-nio-8082-exec-5] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.; nested exception is javax.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.] with root cause

javax.jms.JMSException: Idle link tracker, link qpid-jms:sender:ID:7300953e-f587-4ae3-b9fe-85b84e032554:1:101:1:order-update has been idle for 1800000ms TrackingId:801ab247-3f36-4470-8665-08846eb1c181_G24, SystemTracker:client-link34404815, Timestamp:2019-12-06T21:04:35 [condition = amqp:link:detach-forced]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:164)
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:117)
    at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:262)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:906)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1800(AmqpProvider.java:102)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:792)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

依赖关系

compile group: 'com.microsoft.azure', name: 'azure-servicebus-spring-boot-starter', version: '0.2.0'
compile group: 'javax.jms', name: 'javax.jms-api', version: '2.0.1'
compile group: 'org.apache.qpid', name: 'qpid-jms-client', version: '0.28.0'
compile group: 'org.apache.camel', name: 'camel-jms', version: '2.24.1'
compile group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.0.4.RELEASE'

jmsConnectionFactory配置:

<bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory">
        <bean class="org.apache.qpid.jms.JmsConnectionFactory">
            <constructor-arg value="${azure.jms.url}" />
            <property name="username" value="${azure.jms.username}" />
            <property name="password" value="${azure.jms.password}" />
            <property name="clientID" value="AltaPay" />
            <property name="receiveLocalOnly" value="true" />
            <property name="localMessageExpiry" value="true" />
            <property name="populateJMSXUserID" value="true" />
        </bean>
    </property>
    <property name="exceptionListener">
        <bean class="com.lauraashley.microservice.altapay.callback.exception.CustomJMSExceptionListener" />
    </property>
    <property name="sessionCacheSize" value="10" />
    <property name="cacheConsumers" value="false" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>

自定义JMSExceptionListener

public class CustomJMSExceptionListener implements ExceptionListener {
    
    private static final Logger logger = getLogger(CustomJMSExceptionListener.class);

    @Override
    public void onException(JMSException exception) {
        // TODO Auto-generated method stub
        logger.error("--------------- Catched exception with CustomJMSExceptionListener ---------------");
        logger.error("Error code:"+exception.getErrorCode());
        logger.error("Msg:"+exception.getMessage());
        exception.printStackTrace();
        logger.error("---------------------------------------------------------------------------------");
    }
}

我如何重现它

首先:CustomJMSExceptionListener没有使用,是不是没配置好?

该应用程序是OCC(Oracle云商务)平台上的一个电子商务应用程序,使用java spring-boot服务进行支付集成和流程。

当订单超过空闲时间,然后与 Azure 服务总线的连接失败,为了重新连接,我必须重新启动 java 应用程序,这是一个相当大的问题,因为无法处理更多订单。我读到 CachingConnectionFactory 有 reconnectOnException ,默认情况下为 true。

我真的不明白为什么会发生这种情况以及修复它的解决方案是什么。

jms amqp spring-jms azure-servicebus-queues qpid
3个回答
4
投票

异常表示 Azure 已关闭生产者,因为它闲置太久,这意味着它没有在超时内发送消息(一些文档此处)。在使用 CachingConnectionFactory 时,您可以通过将 cache Producer 选项 配置为 false 来解决此问题,以便按需创建生产者,但我对此并不完全确定,因为我没有任何方法来测试它。

这不是 Qpid JMS 客户端级别的错误,而是 Azure 的行为,在我认为链接上十分钟没有活动后,它将强制关闭链接。在非基于 Spring 的应用程序中,您必须通过在发送时捕获 JMSException 来解决此问题,或者尝试创建新的生产者并再次发送,或者拆除整个连接并重新开始。您的反应在某种程度上取决于您是否预知您正在使用 Azure 并知道这种情况可能会发生。


2
投票

如上所述,这是 Azure 服务总线的预期行为,此问题在 azure-spring-boot 中开放。截至目前,解决方法是将

CachingConnectionFactory.cacheProducers
值设置为
False
,这样将为每个会话创建新的 Producer。

CachingConnectionFactory connectionFactory = (CachingConnectionFactory) jmsTemplate.getConnectionFactory();

connectionFactory.setCacheProducers(false);

另一种可能的方法,

@Bean
    public ConnectionFactory jmsConnectionFactory(AzureServiceBusJMSProperties busJMSProperties){
        final String connectionString = busJMSProperties.getConnectionString();
        final String clientId = busJMSProperties.getTopicClientId();
        final int idleTimeout = busJMSProperties.getIdleTimeout();

        final ServiceBusKey serviceBusKey = ConnectionStringResolver.getServiceBusKey(connectionString);

        final String remoteUri = String.format("amqps://%s?amqp.idleTimeout=%d&amqp.traceFrames=true",
                serviceBusKey.getHost(), idleTimeout);

        final JmsConnectionFactory jmsConnectionFactory =
                new JmsConnectionFactory(
                        serviceBusKey.getSharedAccessKeyName(),
                        serviceBusKey.getSharedAccessKey(),
                        remoteUri
                );
        jmsConnectionFactory.setClientID(clientId);

        CachingConnectionFactory cachingConnectionFactory =
                new CachingConnectionFactory(jmsConnectionFactory);
                // set cache producers to FALSE here
        cachingConnectionFactory.setCacheProducers(false);

        return cachingConnectionFactory;
    }

0
投票

这个问题现在应该解决。我们在 ServiceBus 服务端进行了更改,不再对通过 azure-servicebus-jms 库(这是 spring-cloud-azure-starter-servicebus-jms 使用的依赖库)的 JMS 客户强制执行空闲链接过期,版本 4.x)。

这仅适用于高级消息传递命名空间并使用 azure-servicebus-jms 库或通过 spring-cloud-azure-starter-servicebus-jms(目前版本 4.x)。在任何给定时间点,命名空间可以拥有的活动链接数量仍然存在活动配额强制执行。

真正的长期修复可能需要涉及 Qpid JMS 库的修复,其中客户端上的生产者对象在接收到链接关闭时不会立即被视为关闭。相反,当使用 jms Producer 对象时,qpid jms 客户端库必须检查底层链接是否处于关闭状态,如果是,则为同一生产者再次重新创建底层 amqp 链接。

© www.soinside.com 2019 - 2024. All rights reserved.