activeMq redeliveryPolicy 从 tomee 传递到 spring 时被忽略

问题描述 投票:0回答:5

我在 tomee 1.7.1 和 activeMq 5.10 中部署了一个简单的 spring 应用程序。 我的问题是,我设置的再次投递政策似乎主要被忽略了再次投递的延迟。

我是我的 jms 监听器,我立即抛出异常来测试自动重试。

我的activemq.xml是这样的:

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://activemq.apache.org/schema/core
    http://activemq.apache.org/schema/core/activemq-core.xsd
">
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<!-- 
    The <broker> element is used to configure the ActiveMQ broker. 
-->
<amq:broker xmlns="http://activemq.apache.org/schema/core" useShutdownHook="false" enableStatistics="true" brokerName="bima" useJmx="true" tmpDataDirectory="${catalina.base}/activemq-data/localhost/tmp_storage" populateJMSXUserID="true" useAuthenticatedPrincipalForJMSXUserID="true" schedulerSupport="true">

    <amq:connectionFactory id="jmsRedeliverConnectionFactory" brokerURL="vm://localhost">
      <amq:redeliveryPolicy>
        <amq:redeliveryPolicy maximumRedeliveries="5" initialRedeliveryDelay="1000" redeliveryUseExponentialBackOff ="true" backOffMultiplier="5" />
      </amq:redeliveryPolicy>

    <amq:persistenceAdapter>
        <amq:kahaDB directory="${catalina.base}/activemq-data/kahadb" checkForCorruptJournalFiles="true" checksumJournalFiles="true" journalMaxFileLength="32mb"/>
    </amq:persistenceAdapter>

    <managementContext>
       <managementContext createConnector="false"/>
    </managementContext>
</amq:broker>

我的 tomee.xml 看起来像这样:

<tomee>
    <Resource id="ActiveMQResourceAdapter" type="ActiveMQResourceAdapter">
        BrokerXmlConfig=broker:(vm://localhost)
        <!-- ServerUrl=vm://localhost -->
    </Resource>

  <!-- see http://tomee.apache.org/containers-and-resources.html -->
  <Resource id="resources/jms/ConnectionFactory" type="javax.jms.ConnectionFactory"> 
       ResourceAdapter = ActiveMQResourceAdapter 
       BrokerURL = vm://localhost
       maximumRedeliveries 3
       redeliveryBackOffMultiplier 2
       redeliveryUseExponentialBackOff true
       initialRedeliveryDelay 5000
       <!-- BrokerUrl = tcp://localhost:61616 -->
    </Resource> 

    <Resource id="resources/jms/XAConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL = vm://localhost

    </Resource>

    <Resource id="resources/jms/PrintQueue" type="javax.jms.Queue"/>


    <Resource id="testxa" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test?autoReconnect=true
        User root
    </Resource>

    <Resource id="movieDatabase" type="DataSource">
        XaDataSource testxa
        DataSourceCreator dbcp
        UserName root
    </Resource>

  <!-- activate next line to be able to deploy applications in apps -->
    <Deployments dir="apps" />
</tomee>

我的应用程序上下文中的 jms beans 如下所示:

<bean id="StartQueue" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
        <ref local="jmsFactory" />
    </property>
    <property name="defaultDestinationName" value="StartQueue" />
    <property name="deliveryPersistent" value="true"/>
    <property name="explicitQosEnabled" value="true"/>
</bean>
<bean id="starter" class="org.superbiz.mdb.Start"/>
<jms:listener-container container-type="default" connection-factory="jmsFactory" cache="none" acknowledge="auto" transaction-manager="transactionManager"  concurrency="1" >
    <jms:listener destination="StartQueue" ref="starter" />
</jms:listener-container>

一旦我将最大重新交付 3 添加到 tomee xml,它就开始工作,但重新交付都立即发生。所以有些东西忽略了useExponentialBackOff和initialRedeliveryDelay或者我错过了在某个地方设置它?

编辑:

所以我将 tomee 配置为使用外部代理 activemq 5.10.0 和代理配置

    <!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <plugins>
                    <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
                        <redeliveryPolicyMap>
                            <redeliveryPolicyMap>
                                <defaultEntry>
                                    <redeliveryPolicy  maximumRedeliveries="5" initialRedeliveryDelay="1000" redeliveryDelay="1000" useExponentialBackOff ="true"  backOffMultiplier="5"/>
                                </defaultEntry>
                            </redeliveryPolicyMap>
                        </redeliveryPolicyMap>
                    </redeliveryPlugin>
                </plugins>
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb" checkForCorruptJournalFiles="true" checksumJournalFiles="true" journalMaxFileLength="32mb"/>
        </persistenceAdapter>

          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

和 tomee xml

    <tomee>

    <Resource id="ActiveMQResourceAdapter" type="ActiveMQResourceAdapter">
        # Do not start the embedded ActiveMQ broker
        BrokerXmlConfig  =
        ServerUrl = tcp://10.81.1.28:61617>
    </Resource>

    <Resource id="resources/jms/ConnectionFactory" type="javax.jms.ConnectionFactory"> 
        ResourceAdapter = ActiveMQResourceAdapter 
       maximumRedeliveries 1
    </Resource> 
    <Resource id="testxa" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test?autoReconnect=true
        User root
    </Resource>

    <Resource id="movieDatabase" type="DataSource">
        XaDataSource testxa
        DataSourceCreator dbcp
        UserName root
    </Resource>

  <!-- activate next line to be able to deploy applications in apps -->
    <Deployments dir="apps" />
</tomee>

这有效,当我尝试让 tomee 使用相同的 activemq xml 和

的 tomee xml 启动 activemq 时,队列会迅速重新交付
    <tomee>
    <Resource id="ActiveMQResourceAdapter" type="ActiveMQResourceAdapter">
        BrokerXmlConfig=broker:(vm://localhost)
        BrokerURL = vm://localhost
    </Resource>

  <Resource id="resources/jms/ConnectionFactory" type="javax.jms.ConnectionFactory">
       ResourceAdapter = ActiveMQResourceAdapter
       maximumRedeliveries 1
    </Resource>


    <Resource id="testxa" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test?autoReconnect=true
        User root
    </Resource>

    <Resource id="movieDatabase" type="DataSource">
        XaDataSource testxa
        DataSourceCreator dbcp
        UserName root
    </Resource>

  <!-- activate next line to be able to deploy applications in apps -->
    <Deployments dir="apps" />
</tomee>

配置被忽略,队列没有重新传递,需要做其他事情,所以 tomee 使用重新传递策略?

spring spring-mvc jms activemq apache-tomee
5个回答
0
投票

我认为是 redeliveryUseExponentialBackOff 而不是 useExponentialBackOff: http://activemq.apache.org/maven/5.10.0/apidocs/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.html

基本上使用setter。


0
投票

为什么不在资源适配器上设置它?顺便说一句?


0
投票

默认情况下 tomee 停用调度程序服务,因此这可能不适用于嵌入式代理。

您是否尝试使用外部添加 jar 来支持代理的 xbean xml 配置? (http://tomee.apache.org/jms-resources-and-mdb-container.html


0
投票

回答这个老问题,也许有人可能会从中受益(我遇到了很多类似的问题)。我也遇到过类似的问题,即 RedeliveryPolicy 未正确应用。通过将 MaximumRedeliveries 设置为 -1,它会无限地重试处理消息,但延迟会被忽略。事实证明,由于 Springs DefaultMessageListenerContainer 上的默认

cacheLevel
字段,重新传递无法正确工作。

将此级别设置为

CACHE_CONSUMER
解决了问题!


0
投票

帮助解决这个老问题。 Tomee 使用从资源适配器获取的连接工厂执行服务器端重新传递。另一方面,Spring 默认消息侦听器容器在客户端上执行重新传递。

经过反复试验,重新交付与 spring jms 和 tomee 一起使用的唯一方法是使用此处描述的 jca 消息端点 https://docs.spring.io/spring-framework/reference/integration/jms/jca-message- endpoint-manager.html 您可以使用 jndi 查找 jca 连接器。然后,您可以使用激活规范在 tomee 或每个目的地的适配器上定义重新交付。

© www.soinside.com 2019 - 2024. All rights reserved.