我有一个带有 activeMQ Artemis 队列的 kubernetes 集群,并且我正在使用 hpa 来自动扩展微服务。消息通过 QpidSender 发送并通过 JMSListener 接收。
消息传递有效,但我无法以某种方式配置队列/侦听器,使自动缩放按预期工作。
这是我的 Qpid 发送者
public static void send(String avroMessage, String task) throws JMSException, NamingException {
Connection connection = createConnection();
connection.start();
Session session = createSession(connection);
MessageProducer messageProducer = createProducer(session);
TextMessage message = session.createTextMessage(avroMessage);
message.setStringProperty("task", task);
messageProducer.send(
message,
DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
connection.close();
}
private static MessageProducer createProducer(Session session) throws JMSException {
Destination producerDestination =
session.createQueue("queue?consumer.prefetchSize=1&heartbeat='10000'");
return session.createProducer(producerDestination);
}
private static Session createSession(Connection connection) throws JMSException {
return connection.createSession(Session.AUTO_ACKNOWLEDGE);
}
private static Connection createConnection() throws NamingException, JMSException {
Hashtable<Object, Object> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
env.put("connectionfactory.factoryLookup", amqUrl);
Context context = new javax.naming.InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
return connectionFactory.createConnection(amqUsername, amqPassword);
}
这是我的监听器配置
@Bean
public JmsConnectionFactory jmsConnection() {
JmsConnectionFactory jmsConnection = new JmsConnectionFactory();
jmsConnection.setRemoteURI(this.amqUrl);
jmsConnection.setUsername(this.amqUsername);
jmsConnection.setPassword(this.amqPassword);
return jmsConnection;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConnection());
return factory;
}
这是我的听众
@JmsListener(
destination = "queue?consumer.prefetchSize=1&heartbeat='10000'",
selector = "task = 'myTask'"
)
public void receiveMsg(Message message) throws IOException, JMSException {
message.acknowledge();
doStuff();
}
我这样发送消息
QpidSender.send(avroMessage, "myTask");
此设置有效。我可以发送不同的消息,一旦消息数量超过 2 个,我的服务的第二个实例就会启动并使用该消息。如果稍后消息计数低于 2,则服务终止。
问题是:我不希望在 doStuff() 之前确认消息。因为如果出现问题或者服务在完成 doStuff() 之前终止,消息就会丢失(对吗?)。
但是如果我将其重新排序为
doStuff();
message.acknowledge();
只要第一个服务仍在
doStuff()
并且尚未确认消息,第二个实例就无法从代理接收消息。
如何以一种方式进行配置,以便多个实例可以使用队列中的消息,但如果服务终止或其他原因失败,消息不会丢失
doStuff()
?
使用
factory.setSessionTransacted(true)
。
请参阅 javadocs
DefaultMessageListenerContainer
:
* <p><b>It is strongly recommended to either set {@link #setSessionTransacted
* "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
* "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
* javadoc for details on acknowledge modes and native transaction options, as
* well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
* on configuring an external transaction manager. Note that for the default
* "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
* before listener execution, with no redelivery in case of an exception.