增加消息使用率

问题描述 投票:0回答:3

我们正在使用ActiveMQ作为消息代理。

对于其中一个队列,我们​​发现生成消息的速率远高于消耗消息的速率。有时这会导致ActiveMQ崩溃。

因此,我们正在探索提高消耗率的选项。 (首要任务是增加现有消费者的数量,然后增加消费者吊舱/实例的数量)。

以下是当前侦听器配置

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrency("1-1");

    factory.setMessageConverter(jacksonJmsMessageConverter());
    return factory;
}

侦听器逻辑与数据库交互多次,可以被视为I / O绑定任务。因此,我们正在考虑并行处理多个消息。

我们找到了以下选项。

  1. 我们已同意1-1的暗示concurrentConsumersmaxConcurrentConsumers为1,一次仅处理1条消息。这是我们可以增加为5-10的配置,以便最少5个和最多10个使用者可以同时运行。

  2. 我们还发现我们也可以在侦听器工厂中设置TaskExecutor。像设置threadPoolExecutor (corePoolSize = 5, maxPoolSize = 10, queueCapacity=50)一样也将帮助我们同时处理消息。

  3. 我不确定采用哪个选项(1或2)

  4. 如果将并发性提高到5-10,并且还设置了ThreadPoolExecutor,其行为和影响是什么?
  5. 是否有选择最佳参数的指针?
java jms activemq spring-jms
3个回答
2
投票

DefaultMessageListenerContainer.setTaskExecutor()的JavaDoc这样说:

设置Spring DefaultMessageListenerContainer.setTaskExecutor()以用于运行侦听器线程。

默认为TaskExecutor,根据指定的并发使用者数,启动多个新线程。

指定用于与现有线程池集成的替代方法SimpleAsyncTaskExecutor。请注意,这仅在以特定方式(例如在Java EE环境中)管理线程时才真正增加价值。普通线程池不会增加太多价值,因为此侦听器容器在其整个生命周期内都会占用许多线程。

因此,设置任务执行程序的主要用例是与现有线程池集成。默认执行程序已经根据您配置的并发使用者的数量进行扩展。因此,我不建议您设置任务执行器。

我建议仅设置并发。

您需要通过仔细的基准测试和分析来确定最佳值。在开始该过程之前,我强烈建议您设置一个特定的性能目标,因为没有目标,基准测试和优化就可能成为无休止的任务。

您可能还考虑使用SimpleAsyncTaskExecutor,以便队列首先不会变得很满。


1
投票

您观察到的以msg / s和MB / s为单位的吞吐量是多少?

以我的经验,您将观察到使用直接JMS API通过PooledConnectionFactory进行批处理事务时,将观察到最快的吞吐量。 Spring JMS模板可能难以配置和调优,特别是带有多线程和事务的。它还可以按消息关闭诸如Consumer和Session之类的对象,这消除了代理端缓存和预取的好处。

当您直接进行JMS-API和批处理事务(JMS,而不是XA)时,您将获得ActiveMQ代理的服务器端缓存和预取的好处-这将带来巨大的不同。


0
投票

我会同意之前的回答。

当您使用Spring DefaultMessageListenerContainer(DMLC)消耗消息的性能较差时,这几乎总是意味着对于每个读取的消息都将重新创建JMS使用者/会话/连接。

在非事务处理的情况下,可以让DMLC缓存使用者。 DMLC将使用单个JMS连接,并在该连接上启动多个JMS会话。每个会话将有一个JMS消息使用者。

在事务处理的情况下,DMLC中将没有缓存,并且必须使用缓存连接工厂以避免性能问题。查看TaskExecutorflow control for the producers以了解此功能。

关于您的问题,我也不会理会org.apache.activemq.pool.PooledConnectionFactory。我建议从org.messaginghub.pooled.jms.JmsPoolConnectionFactory开始并发。我发现具有最小和最大数量的消费者(只是拥有固定的池)在稳定性和性能方面具有优势。

关于ActiveMQ崩溃,这是由于ActiveMQ预取引起的。默认情况下,ActiveMQ的预取值为1000。JMS消息使用者请求一条消息,并且代理在预取中传递该消息和其他999条消息。如果随后关闭了使用者/会话,则999条消息在客户端被丢弃,然后在代理上重新排队。这对经纪人非常不当,处理得不好。

[另外,请注意,例如,如果有5个并发使用者,则第一个使用者将获得1000条消息,然后下一个使用者将获得1000条消息,依此类推。因此,如果队列中只有500条消息,则只有一个使用者是活动的。您需要在队列中有5000条消息才能激活所有5个使用者。

如果有疑问,请在客户端配置中禁用消息预取:

TaskExecutor

© www.soinside.com 2019 - 2024. All rights reserved.