我们正在使用ActiveMQ作为消息代理。
对于其中一个队列,我们发现生成消息的速率远高于消耗消息的速率。有时这会导致ActiveMQ崩溃。
因此,我们正在探索提高消耗率的选项。 (首要任务是增加现有消费者的数量,然后增加消费者吊舱/实例的数量)。
以下是当前侦听器配置
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
侦听器逻辑与数据库交互多次,可以被视为I / O绑定任务。因此,我们正在考虑并行处理多个消息。
我们找到了以下选项。
我们已同意1-1
的暗示concurrentConsumers
和maxConcurrentConsumers
为1,一次仅处理1条消息。这是我们可以增加为5-10
的配置,以便最少5个和最多10个使用者可以同时运行。
我们还发现我们也可以在侦听器工厂中设置TaskExecutor
。像设置threadPoolExecutor (corePoolSize = 5, maxPoolSize = 10, queueCapacity=50)
一样也将帮助我们同时处理消息。
我不确定采用哪个选项(1或2)
DefaultMessageListenerContainer.setTaskExecutor()
的JavaDoc这样说:
设置Spring
DefaultMessageListenerContainer.setTaskExecutor()
以用于运行侦听器线程。默认为
TaskExecutor
,根据指定的并发使用者数,启动多个新线程。指定用于与现有线程池集成的替代方法
SimpleAsyncTaskExecutor
。请注意,这仅在以特定方式(例如在Java EE环境中)管理线程时才真正增加价值。普通线程池不会增加太多价值,因为此侦听器容器在其整个生命周期内都会占用许多线程。
因此,设置任务执行程序的主要用例是与现有线程池集成。默认执行程序已经根据您配置的并发使用者的数量进行扩展。因此,我不建议您设置任务执行器。
我建议仅设置并发。
您需要通过仔细的基准测试和分析来确定最佳值。在开始该过程之前,我强烈建议您设置一个特定的性能目标,因为没有目标,基准测试和优化就可能成为无休止的任务。
您可能还考虑使用SimpleAsyncTaskExecutor
,以便队列首先不会变得很满。
您观察到的以msg / s和MB / s为单位的吞吐量是多少?
以我的经验,您将观察到使用直接JMS API通过PooledConnectionFactory进行批处理事务时,将观察到最快的吞吐量。 Spring JMS模板可能难以配置和调优,特别是带有多线程和事务的。它还可以按消息关闭诸如Consumer和Session之类的对象,这消除了代理端缓存和预取的好处。
当您直接进行JMS-API和批处理事务(JMS,而不是XA)时,您将获得ActiveMQ代理的服务器端缓存和预取的好处-这将带来巨大的不同。
我会同意之前的回答。
当您使用Spring DefaultMessageListenerContainer(DMLC)消耗消息的性能较差时,这几乎总是意味着对于每个读取的消息都将重新创建JMS使用者/会话/连接。
在非事务处理的情况下,可以让DMLC缓存使用者。 DMLC将使用单个JMS连接,并在该连接上启动多个JMS会话。每个会话将有一个JMS消息使用者。
在事务处理的情况下,DMLC中将没有缓存,并且必须使用缓存连接工厂以避免性能问题。查看TaskExecutor
或flow control for the producers以了解此功能。
关于您的问题,我也不会理会org.apache.activemq.pool.PooledConnectionFactory
。我建议从org.messaginghub.pooled.jms.JmsPoolConnectionFactory
开始并发。我发现具有最小和最大数量的消费者(只是拥有固定的池)在稳定性和性能方面具有优势。
关于ActiveMQ崩溃,这是由于ActiveMQ预取引起的。默认情况下,ActiveMQ的预取值为1000。JMS消息使用者请求一条消息,并且代理在预取中传递该消息和其他999条消息。如果随后关闭了使用者/会话,则999条消息在客户端被丢弃,然后在代理上重新排队。这对经纪人非常不当,处理得不好。
[另外,请注意,例如,如果有5个并发使用者,则第一个使用者将获得1000条消息,然后下一个使用者将获得1000条消息,依此类推。因此,如果队列中只有500条消息,则只有一个使用者是活动的。您需要在队列中有5000条消息才能激活所有5个使用者。
如果有疑问,请在客户端配置中禁用消息预取:
TaskExecutor