我在 tomee 1.7.1 和 activeMq 5.10 中部署了一个简单的 spring 应用程序。 我的问题是,我设置的再次投递政策似乎主要被忽略了再次投递的延迟。
我是我的 jms 监听器,我立即抛出异常来测试自动重试。
我的activemq.xml是这样的:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
">
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<amq:broker xmlns="http://activemq.apache.org/schema/core" useShutdownHook="false" enableStatistics="true" brokerName="bima" useJmx="true" tmpDataDirectory="${catalina.base}/activemq-data/localhost/tmp_storage" populateJMSXUserID="true" useAuthenticatedPrincipalForJMSXUserID="true" schedulerSupport="true">
<amq:connectionFactory id="jmsRedeliverConnectionFactory" brokerURL="vm://localhost">
<amq:redeliveryPolicy>
<amq:redeliveryPolicy maximumRedeliveries="5" initialRedeliveryDelay="1000" redeliveryUseExponentialBackOff ="true" backOffMultiplier="5" />
</amq:redeliveryPolicy>
<amq:persistenceAdapter>
<amq:kahaDB directory="${catalina.base}/activemq-data/kahadb" checkForCorruptJournalFiles="true" checksumJournalFiles="true" journalMaxFileLength="32mb"/>
</amq:persistenceAdapter>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
</amq:broker>
我的 tomee.xml 看起来像这样:
<tomee>
<Resource id="ActiveMQResourceAdapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig=broker:(vm://localhost)
<!-- ServerUrl=vm://localhost -->
</Resource>
<!-- see http://tomee.apache.org/containers-and-resources.html -->
<Resource id="resources/jms/ConnectionFactory" type="javax.jms.ConnectionFactory">
ResourceAdapter = ActiveMQResourceAdapter
BrokerURL = vm://localhost
maximumRedeliveries 3
redeliveryBackOffMultiplier 2
redeliveryUseExponentialBackOff true
initialRedeliveryDelay 5000
<!-- BrokerUrl = tcp://localhost:61616 -->
</Resource>
<Resource id="resources/jms/XAConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL = vm://localhost
</Resource>
<Resource id="resources/jms/PrintQueue" type="javax.jms.Queue"/>
<Resource id="testxa" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test?autoReconnect=true
User root
</Resource>
<Resource id="movieDatabase" type="DataSource">
XaDataSource testxa
DataSourceCreator dbcp
UserName root
</Resource>
<!-- activate next line to be able to deploy applications in apps -->
<Deployments dir="apps" />
</tomee>
我的应用程序上下文中的 jms beans 如下所示:
<bean id="StartQueue" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="StartQueue" />
<property name="deliveryPersistent" value="true"/>
<property name="explicitQosEnabled" value="true"/>
</bean>
<bean id="starter" class="org.superbiz.mdb.Start"/>
<jms:listener-container container-type="default" connection-factory="jmsFactory" cache="none" acknowledge="auto" transaction-manager="transactionManager" concurrency="1" >
<jms:listener destination="StartQueue" ref="starter" />
</jms:listener-container>
一旦我将最大重新交付 3 添加到 tomee xml,它就开始工作,但重新交付都立即发生。所以有些东西忽略了useExponentialBackOff和initialRedeliveryDelay或者我错过了在某个地方设置它?
编辑:
所以我将 tomee 配置为使用外部代理 activemq 5.10.0 和代理配置
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<defaultEntry>
<redeliveryPolicy maximumRedeliveries="5" initialRedeliveryDelay="1000" redeliveryDelay="1000" useExponentialBackOff ="true" backOffMultiplier="5"/>
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" checkForCorruptJournalFiles="true" checksumJournalFiles="true" journalMaxFileLength="32mb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>
<!-- END SNIPPET: example -->
和 tomee xml
<tomee>
<Resource id="ActiveMQResourceAdapter" type="ActiveMQResourceAdapter">
# Do not start the embedded ActiveMQ broker
BrokerXmlConfig =
ServerUrl = tcp://10.81.1.28:61617>
</Resource>
<Resource id="resources/jms/ConnectionFactory" type="javax.jms.ConnectionFactory">
ResourceAdapter = ActiveMQResourceAdapter
maximumRedeliveries 1
</Resource>
<Resource id="testxa" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test?autoReconnect=true
User root
</Resource>
<Resource id="movieDatabase" type="DataSource">
XaDataSource testxa
DataSourceCreator dbcp
UserName root
</Resource>
<!-- activate next line to be able to deploy applications in apps -->
<Deployments dir="apps" />
</tomee>
这有效,当我尝试让 tomee 使用相同的 activemq xml 和
的 tomee xml 启动 activemq 时,队列会迅速重新交付 <tomee>
<Resource id="ActiveMQResourceAdapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig=broker:(vm://localhost)
BrokerURL = vm://localhost
</Resource>
<Resource id="resources/jms/ConnectionFactory" type="javax.jms.ConnectionFactory">
ResourceAdapter = ActiveMQResourceAdapter
maximumRedeliveries 1
</Resource>
<Resource id="testxa" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test?autoReconnect=true
User root
</Resource>
<Resource id="movieDatabase" type="DataSource">
XaDataSource testxa
DataSourceCreator dbcp
UserName root
</Resource>
<!-- activate next line to be able to deploy applications in apps -->
<Deployments dir="apps" />
</tomee>
配置被忽略,队列没有重新传递,需要做其他事情,所以 tomee 使用重新传递策略?
我认为是 redeliveryUseExponentialBackOff 而不是 useExponentialBackOff: http://activemq.apache.org/maven/5.10.0/apidocs/org/apache/activemq/ra/ActiveMQManagedConnectionFactory.html
基本上使用setter。
为什么不在资源适配器上设置它?顺便说一句?
默认情况下 tomee 停用调度程序服务,因此这可能不适用于嵌入式代理。
您是否尝试使用外部添加 jar 来支持代理的 xbean xml 配置? (http://tomee.apache.org/jms-resources-and-mdb-container.html)
回答这个老问题,也许有人可能会从中受益(我遇到了很多类似的问题)。我也遇到过类似的问题,即 RedeliveryPolicy 未正确应用。通过将 MaximumRedeliveries 设置为 -1,它会无限地重试处理消息,但延迟会被忽略。事实证明,由于 Springs DefaultMessageListenerContainer 上的默认
cacheLevel
字段,重新传递无法正确工作。
将此级别设置为
CACHE_CONSUMER
解决了问题!
帮助解决这个老问题。 Tomee 使用从资源适配器获取的连接工厂执行服务器端重新传递。另一方面,Spring 默认消息侦听器容器在客户端上执行重新传递。
经过反复试验,重新交付与 spring jms 和 tomee 一起使用的唯一方法是使用此处描述的 jca 消息端点 https://docs.spring.io/spring-framework/reference/integration/jms/jca-message- endpoint-manager.html 您可以使用 jndi 查找 jca 连接器。然后,您可以使用激活规范在 tomee 或每个目的地的适配器上定义重新交付。