据我所知,Spring 的
JmsTemplate
默认配置使用同步方法发送消息,这使得性能非常糟糕。这里有什么方法可以使用连接池吗?
public void insertIntoMQ(String kafkaMessage) {
//log.info("Inside insertIntoMQ function");
try {
jMSConfiguration2.insertIntoMQ(kafkaMessage);
log.info("Message Inserted Successfully :\n" + kafkaMessage);
} catch (Exception e) {
log.error("Exception Inserting Message Into MQ ", e);
}
}
上面的方法在不同的类中调用下面的
insertIntoMQ
方法来插入消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class JMSConfiguration2 {
@Autowired
private JmsTemplate jmsTemplate;
private static final org.apache.logging.log4j.Logger log = org.apache.logging.log4j.LogManager
.getLogger(JMSConfiguration2.class);
@Value("${ibm.mq.queueName}")
private String queueName;
@Async
public void insertIntoMQ(String kafkaMessage) {
//log.info("Inside insertIntoMQ function");
try{
jmsTemplate.convertAndSend(queueName, kafkaMessage);
log.info("Message Inserted Successfully :\n" + kafkaMessage);
} catch (Exception e) {
log.error("Exception Inserting Message Into MQ ", e);
}
}
}
我添加了缓存连接工厂,但性能略好但达不到标准。 我能够达到 320 TPS,但我需要更多的 TPS。据我观察,在任何时间点只有 8 个线程/8 个任务在运行以将消息插入 MQ,但有大量消息等待插入但只有 8 个线程被创建以插入。所以,无论如何要增加它?
@Configuration
public class MQutil {
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
try{
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
// Create an MQConnectionFactory with the MQ properties
MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
mqConnectionFactory.setHostName(".ca");
mqConnectionFactory.setPort(1414);
mqConnectionFactory.setQueueManager("TQMGR");
mqConnectionFactory.setChannel("CHANNEL");
mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
connectionFactoryAdapter.setUsername("e");
connectionFactoryAdapter.setPassword("es");
cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(10);
} catch (Exception e) {
System.out.println("Exception Inserting Message Into MQ "+ e);
}
return cachingConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE2");
return jmsTemplate;
}
}
通过微小的调整,我能够在我的环境中将您的代码加速大约 ~400 倍。您的代码很慢,因为它付出了很多代价:
还有一些其他成本,例如创建 JMS MessageProducer,但它们并不占主导地位。在我的环境中,像您这样的代码能够发布大约 22-23 msg/sec。
如果您使用默认设置的 spring CachingConnectionFactory,它将缓存连接和会话。在我的环境中,这样的代码能够发布大约 400 msg/sec。请注意 - 您不能在应用程序容器内使用 CachingConnectionFactory。在这种情况下,从 JNDI 获取 connectionFactory,连接缓存将由应用程序容器提供。
对于下一步,您可以将 QoS 属性设置为 NON_PERSISTEN 并将确认模式设置为 AUTO_ACKNOWLEDE。在我的环境中,这些步骤还将消息生成速度提高到大约 750 msg/sec。
为了最终提高速度,您需要调整 MQ 目标上的目标设置。将 Default put response type 属性设置为 Asynchronous。在 MQSC 控制台下,此属性在您的队列中称为 DEFPRESP;或者您可以通过 MQ Explorer 进行设置:
有了这个最后的改变,我能够在我的环境中推送 ~8500 msg/sec。
所有这些性能提升都来自:
使用 JMSTemplate 的 executes 方法也可以使用替代方法,因为您可以在 Session 或 ProducerCallbacks 中控制许多更精细的范围/缓存细节。
一些免责声明: 在 IBM MQ 中,非持久性消息仍然保存到磁盘,但是当消息存储到内存缓冲区时而不是缓冲区(和消息)被刷新时,确认返回给发送者。这意味着在意外断电时,您将丢失未刷新的消息。
可以在 MQ 服务器级别进行一些其他调整,但这远远超出了 Java、Spring 和 JMS。
这是启用了所有优化的代码示例:
void sendItFast() throws Exception {
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
MQConnectionFactory cf = new MQConnectionFactory();
cf.setHostName("ca");
cf.setPort(1414);
cf.setQueueManager("ESB");
cf.setChannel("CHANNEL");
cf.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
ccf.afterPropertiesSet();
QosSettings qosSettings = new QosSettings();
qosSettings.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
JmsTemplate jt = new JmsTemplate(ccf);
jt.setExplicitQosEnabled(true);
jt.setQosSettings(qosSettings);
jt.setSessionTransacted(false);
jt.setSessionAcknowledgeMode(JMSContext.AUTO_ACKNOWLEDGE);
jt.afterPropertiesSet();
int msgCount = 20000;
String message = "Hello world: ";
long duration = - System.currentTimeMillis();
for (int i = 0; i < msgCount; i++) {
String payload = message + System.nanoTime();
jt.convertAndSend("SPEED_TEST", payload);
}
duration += System.currentTimeMillis();
float speed = msgCount * 1000 / duration;
System.out.println("Sent " + msgCount + " in " + duration + " ms. " + speed + " msg/sec");
}