如何使用带连接池的Spring的JmsTemplate?

问题描述 投票:0回答:2

据我所知,Spring 的

JmsTemplate
默认配置使用同步方法发送消息,这使得性能非常糟糕。这里有什么方法可以使用连接池吗?

public void insertIntoMQ(String kafkaMessage) {
    //log.info("Inside insertIntoMQ function");
    try {
        jMSConfiguration2.insertIntoMQ(kafkaMessage); 
        log.info("Message Inserted Successfully :\n" + kafkaMessage);
    } catch (Exception e) {
        log.error("Exception Inserting Message Into MQ ", e);
    }
}

上面的方法在不同的类中调用下面的

insertIntoMQ
方法来插入消息。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class JMSConfiguration2 {

    @Autowired
    private JmsTemplate jmsTemplate;
    
    private static final org.apache.logging.log4j.Logger log = org.apache.logging.log4j.LogManager
            .getLogger(JMSConfiguration2.class);

    @Value("${ibm.mq.queueName}")
    private String queueName;
    
    @Async
    public void insertIntoMQ(String kafkaMessage) {
        //log.info("Inside insertIntoMQ function");
        try{
                    
            jmsTemplate.convertAndSend(queueName, kafkaMessage); 
            log.info("Message Inserted Successfully :\n" + kafkaMessage);
        } catch (Exception e) {

            log.error("Exception Inserting Message Into MQ ", e);
        }
    }   
}

我添加了缓存连接工厂,但性能略好但达不到标准。 我能够达到 320 TPS,但我需要更多的 TPS。据我观察,在任何时间点只有 8 个线程/8 个任务在运行以将消息插入 MQ,但有大量消息等待插入但只有 8 个线程被创建以插入。所以,无论如何要增加它?

@Configuration
public class MQutil {

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        try{
            System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
            // Create an MQConnectionFactory with the MQ properties
            MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
            mqConnectionFactory.setHostName(".ca");
            mqConnectionFactory.setPort(1414);
            mqConnectionFactory.setQueueManager("TQMGR");
            mqConnectionFactory.setChannel("CHANNEL");
            mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
            mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);

            UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
            connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
            connectionFactoryAdapter.setUsername("e");
            connectionFactoryAdapter.setPassword("es");

            cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
            cachingConnectionFactory.setSessionCacheSize(10);

        } catch (Exception e) {

            System.out.println("Exception Inserting Message Into MQ "+ e);
        }
        return cachingConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE2");
        return jmsTemplate;
    }
}
spring-boot jms spring-jms
2个回答
0
投票

通过微小的调整,我能够在我的环境中将您的代码加速大约 ~400 倍。您的代码很慢,因为它付出了很多代价:

  • 每次通话打开连接的成本
  • 创建 jms 会话的成本
  • 开队成本
  • 事务中的消息持久化成本

还有一些其他成本,例如创建 JMS MessageProducer,但它们并不占主导地位。在我的环境中,像您这样的代码能够发布大约 22-23 msg/sec

如果您使用默认设置的 spring CachingConnectionFactory,它将缓存连接和会话。在我的环境中,这样的代码能够发布大约 400 msg/sec。请注意 - 您不能在应用程序容器内使用 CachingConnectionFactory。在这种情况下,从 JNDI 获取 connectionFactory,连接缓存将由应用程序容器提供。

对于下一步,您可以将 QoS 属性设置为 NON_PERSISTEN 并将确认模式设置为 AUTO_ACKNOWLEDE。在我的环境中,这些步骤还将消息生成速度提高到大约 750 msg/sec

为了最终提高速度,您需要调整 MQ 目标上的目标设置。将 Default put response type 属性设置为 Asynchronous。在 MQSC 控制台下,此属性在您的队列中称为 DEFPRESP;或者您可以通过 MQ Explorer 进行设置:

有了这个最后的改变,我能够在我的环境中推送 ~8500 msg/sec

所有这些性能提升都来自:

  • 资源缓存
  • 发送、刷新和确认缓冲区;而不是个别消息。本质上就是 Kafka 的工作方式。

使用 JMSTemplate 的 executes 方法也可以使用替代方法,因为您可以在 Session 或 ProducerCallbacks 中控制许多更精细的范围/缓存细节。

一些免责声明: 在 IBM MQ 中,非持久性消息仍然保存到磁盘,但是当消息存储到内存缓冲区时而不是缓冲区(和消息)被刷新时,确认返回给发送者。这意味着在意外断电时,您将丢失未刷新的消息。

可以在 MQ 服务器级别进行一些其他调整,但这远远超出了 Java、Spring 和 JMS。

这是启用了所有优化的代码示例:

void sendItFast() throws Exception {
    System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");

    MQConnectionFactory cf = new MQConnectionFactory();
    cf.setHostName("ca");
    cf.setPort(1414);
    cf.setQueueManager("ESB");
    cf.setChannel("CHANNEL");
    cf.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
    cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);

    CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
    ccf.afterPropertiesSet();

    QosSettings qosSettings = new QosSettings();
    qosSettings.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    JmsTemplate jt = new JmsTemplate(ccf);
    jt.setExplicitQosEnabled(true);
    jt.setQosSettings(qosSettings);
    jt.setSessionTransacted(false);
    jt.setSessionAcknowledgeMode(JMSContext.AUTO_ACKNOWLEDGE);
    jt.afterPropertiesSet();


    int msgCount = 20000;
    String message = "Hello world: ";

    long duration = - System.currentTimeMillis();

    for (int i = 0; i < msgCount; i++) {
        String payload = message + System.nanoTime();
        jt.convertAndSend("SPEED_TEST", payload);
    }

    duration += System.currentTimeMillis();
    float speed = msgCount * 1000 / duration;
    System.out.println("Sent " + msgCount + " in " + duration + " ms. " + speed  + " msg/sec");
}

© www.soinside.com 2019 - 2024. All rights reserved.