Mule jms使用者正在缓慢地选择消息

问题描述 投票:0回答:1

我正在实施m子ESB 3.9以使用来自活动mq的消息。

我实际上安排了约15个领域组件以到达最终目的地

我已经为jms配置了一个连接池,并且正在使用的消费者数量为32,接收者中的最大线程为500。

但是我看到直到每秒4条消息,jmeter发送的请求才可以。但是,如果我增加到每秒5 msgs,那么我发现jms发送的请求平均会缓慢地每秒大约3 msgs。]

关于如何配置使用者和接收方线程数的任何想法?

我的目标是每秒发送10条消息,而且我的所有15个组件每个最多仅花费30毫秒来处理。因此,这些组件的处理时间很好...但是我看到总吞吐量有时约为3秒,主要是因为未立即选择邮件。

请提出建议

Activemq xml


<destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                                                                                
                                                                                                
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="10000"/>
                  </pendingMessageLimitStrategy>
                                                                  
                                                                                <slowConsumerStrategy>
                                                                                                <abortSlowConsumerStrategy/>
                                                                                  </slowConsumerStrategy>
                                                                                </policyEntry>
                                                                
                                                                
                                                                                <!--Mahesh added start-->
                                                
                                
                                                                                                <policyEntry queue=">" >
                                                                                                                  <slowConsumerStrategy>
                                                                                                                                <abortSlowConsumerStrategy/>
                                                                                                                  </slowConsumerStrategy>
                                                                                                </policyEntry>
                                
                                <!--Mahesh added end-->
                                
                                
              </policyEntries>
            </policyMap>
        </destinationPolicy>

 <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
        
         <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="90" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="5 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="5 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
        
           <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

有关Mule ESB的其他信息

  1. 我正在使用预取的1条消息
  2. 使用者数量为32,接收者线程为500

mule-config ---->

<spring:beans>
	<context:component-scan  base-package="com.test.esb.orm.db" />
	<spring:import resource="classpath*:applicationContext.xml" />
	<spring:import resource="classpath*:test-orm-beans.xml" />
	<spring:import resource="mule-domain-spring.xml"/>
</spring:beans>

<db:generic-config name="Generic_Database_Configuration" 
					dataSource-ref="testDataSource"
					doc:name="Generic Database Configuration"/>
					
<db:generic-config name="Generic_TrackTraceDatabase_Configuration" 
					dataSource-ref="testTrackTraceDataSource"
					doc:name="Generic Database Configuration for Track Trace"/>
					
<!-- ********************* ESB Gateway Configurations ******************************** -->
<http:listener-config name="test-Shared-http-listener" host="${domain.gateway.host}" port="${domain.gateway.port}" doc:name="HTTP Listener connector for ESB Gateway (From ui)">
	<http:worker-threading-profile maxThreadsActive="64"
						poolExhaustedAction="WAIT"
						threadWaitTimeout="30000" />
</http:listener-config>

   
<jms:activemq-connector name="com.test.esb.trans.jmsConnector"
                                            username="${domain.amq.user.id}" password="${domain.amq.user.password}"
                                            connectionFactory-ref="pooledConnectionFactory"
                                            numberOfConsumers="16" acknowledgementMode="AUTO_ACKNOWLEDGE"

                                            validateConnections="true"
                                            persistentDelivery="true"
                                            doc:name="AMQ Connector For Domain Routing Services"
                                            specification="1.1">
    <receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
    <dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
    <reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>


    <jms:activemq-connector name="com.test.esb.domain.jmsConnector" 
						username="${domain.amq.user.id}" password="${domain.amq.user.password}" 
					connectionFactory-ref="pooledConnectionFactory"
					numberOfConsumers="16" acknowledgementMode="AUTO_ACKNOWLEDGE"  
					
						validateConnections="true" 
						persistentDelivery="false" 
						doc:name="AMQ Connector For Domain Routing Services" 
						specification="1.1">
	<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
	<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
	<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>

  
<jms:activemq-connector name="com.test.esb.domain.orchestrator.jmsConnector"
                                            username="${domain.amq.user.id}" password="${domain.amq.user.password}"
                                            connectionFactory-ref="pooledConnectionFactory"
                                            numberOfConsumers="16" acknowledgementMode="AUTO_ACKNOWLEDGE"

                                            validateConnections="true"
                                            persistentDelivery="true"
                                            doc:name="AMQ Connector For Domain Routing Services"
                                            specification="1.1">
    <receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
    <dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
    <reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>


   
<jms:activemq-connector name="com.test.esb.interface2.jmsConnector" 
						username="${domain.amq.user.id}" password="${domain.amq.user.password}" 
						connectionFactory-ref="pooledConnectionFactory"
						validateConnections="true" 
						numberOfConcurrentTransactedReceivers="16"
						persistentDelivery="false" 
						doc:name="AMQ Connector for interface2 Interface" 
						specification="1.1">
	<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT" threadWaitTimeout="30000"/>
	<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
	<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>

   
<jms:activemq-connector name="com.test.esb.destination.router.jmsConnector" 
						username="${domain.amq.user.id}" password="${domain.amq.user.password}" 
						connectionFactory-ref="pooledConnectionFactory"
						validateConnections="true" 
						numberOfConcurrentTransactedReceivers="8"
						persistentDelivery="false" 
						doc:name="AMQ Connector for DestinationRouter" 
						specification="1.1">
	<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
	<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
	<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>

   
<jms:activemq-connector name="com.test.esb.dlq.jmsConnector" 
						username="${domain.amq.user.id}" password="${domain.amq.user.password}" 
						connectionFactory-ref="pooledConnectionFactory"
						validateConnections="true" 
						numberOfConcurrentTransactedReceivers="16"
						persistentDelivery="false" 
						doc:name="AMQ Connector for Dead Letter Queue" 
						specification="1.1">
	<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT" threadWaitTimeout="30000"/>
	<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>

mule-spring config ---->

<spring:bean id="domainRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
		<spring:property name="maximumRedeliveries" value="5" />
		<spring:property name="initialRedeliveryDelay" value="500" />
		<spring:property name="maximumRedeliveryDelay" value="10000" />
		<spring:property name="useExponentialBackOff" value="false" />
		<spring:property name="backOffMultiplier" value="3" />
	</spring:bean>
	
	
	

	
	<!-- ActiveMQ Connection factory -->
	<spring:bean id="domainConnectionFactory"
		class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
		<spring:property name="brokerURL"
			value="tcp://192.0.0.0:61616?jms.prefetchPolicy.all=1" />
		<spring:property name="redeliveryPolicy" ref="domainRedeliveryPolicy" />
	</spring:bean>
	
	
		<!-- amqExceptionConnectionFactory Connection factory -->
	<spring:bean id="amqExceptionConnectionFactory"
		class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
		<spring:property name="brokerURL"
			value="failover:(${domain.amq.failover.url}?jms.prefetchPolicy.all=50)" />
		<spring:property name="redeliveryPolicy" ref="domainRedeliveryPolicy" />
	</spring:bean>
	
	
	<spring:bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" >
		<spring:property name="connectionFactory" ref="domainConnectionFactory"/>
        	<spring:property name="maxConnections" value="10000" />
        	<spring:property name="maximumActiveSessionPerConnection" value="10000" />
    	</spring:bean>

	<spring:bean id="trackTraceRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
		<spring:property name="maximumRedeliveries" value="5" />
		<spring:property name="initialRedeliveryDelay" value="500" />
		<spring:property name="maximumRedeliveryDelay" value="10000" />
		<spring:property name="useExponentialBackOff" value="false" />
		<spring:property name="backOffMultiplier" value="3" />
	</spring:bean>


	<spring:bean id="trackTraceConnectionFactory"
		class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
		<spring:property name="brokerURL"
		value="failover:(${domain.amq.failover.url})" />
			<!-- value="tcp://${domain.amq.host}:${domain.amq.port}" /> -->
			
		<spring:property name="redeliveryPolicy" ref="trackTraceRedeliveryPolicy" />
	</spring:bean>

	

	<spring:bean id="interface2ConnectionFactory"
		class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
		<spring:property name="brokerURL"
		value="tcp://192.0.0.0:61616?jms.prefetchPolicy.all=1" />
			<!-- value="tcp://${domain.amq.host}:${domain.amq.port}" /> -->
		<spring:property name="redeliveryPolicy" ref="interface2RedeliveryPolicy" />
	</spring:bean>

	<spring:bean id="interface2RedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
		<spring:property name="maximumRedeliveries" value="5" />
		<spring:property name="initialRedeliveryDelay" value="500" />
		<spring:property name="maximumRedeliveryDelay" value="10000" />
		<spring:property name="useExponentialBackOff" value="false" />
		<spring:property name="backOffMultiplier" value="3" />
	</spring:bean>

	<!-- ********************* Destination Router Configurations ******************************** -->
	<!-- ActiveMQ Connection factory for destination Router -->
	<spring:bean id="destinationRouterConnectionFactory"
		class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
		<spring:property name="brokerURL"
		value="tcp://192.0.0.0:61616?jms.prefetchPolicy.all=1" />
			<!-- value="tcp://${domain.amq.host}:${domain.amq.port}" /> -->
		<spring:property name="redeliveryPolicy"
			ref="destinationRouterRedeliveryPolicy" />
	</spring:bean>

	<spring:bean id="destinationRouterRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
		<spring:property name="maximumRedeliveries" value="5" />
		<spring:property name="initialRedeliveryDelay" value="500" />
		<spring:property name="maximumRedeliveryDelay" value="10000" />
		<spring:property name="useExponentialBackOff" value="false" />
		<spring:property name="backOffMultiplier" value="3" />
	</spring:bean>

	<!-- ********************* Dead Letter Queue (DLQ) Configurations ******************************** -->
	<!-- ActiveMQ Connection factory for Dead Letter Queue -->
	<spring:bean id="dlqConnectionFactory"
		class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
		<spring:property name="brokerURL"
		value="failover:(${domain.amq.failover.url})" />
			
		<spring:property name="redeliveryPolicy" ref="dlqRedeliveryPolicy" />
	</spring:bean>

	<spring:bean id="dlqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
		<spring:property name="maximumRedeliveries" value="5" />
		<spring:property name="initialRedeliveryDelay" value="500" />
		<spring:property name="maximumRedeliveryDelay" value="10000" />
		<spring:property name="useExponentialBackOff" value="false" />
		<spring:property name="backOffMultiplier" value="3" />
	</spring:bean>
activemq mule-component mule-esb
1个回答
0
投票

numberOfConsumers是一个属性,用于确定如果内部处理正在使用事务,则客户机线程数。如果端点在事务中,则需要使用numberOfConcurrentTransactedReceivers

您确实应该删除<dispatcher-threading-profile><receiver-threading-profile>配置。每个JMS配置中有500个线程对于您的系统来说是很多的,如果创建它们,则将占用大量资源,并且根本无法解决该用例。仅应出于某些实际原因设置这些值。

尝试在处理缓慢时捕获线程转储以分析线程在做什么。这应该指出根本原因。

参考:https://help.mulesoft.com/s/article/JMS-Usage-of-numberOfConsumers-and-numberOfConcurrentTransactedReceivers

© www.soinside.com 2019 - 2024. All rights reserved.