如何在 ActiveMQ 上配置 JmsListener 以使用 Qpid Sender 自动缩放

问题描述 投票:0回答:1

我有一个带有 activeMQ Artemis 队列的 kubernetes 集群,并且我正在使用 hpa 来自动扩展微服务。消息通过 QpidSender 发送并通过 JMSListener 接收。

消息传递有效,但我无法以某种方式配置队列/侦听器,使自动缩放按预期工作。

这是我的 Qpid 发送者

public static void send(String avroMessage, String task) throws JMSException, NamingException {
    Connection connection = createConnection();
    connection.start();

    Session session = createSession(connection);
    MessageProducer messageProducer = createProducer(session);

    TextMessage message = session.createTextMessage(avroMessage);
    message.setStringProperty("task", task);
    messageProducer.send(
        message, 
        DeliveryMode.NON_PERSISTENT, 
        Message.DEFAULT_PRIORITY, 
        Message.DEFAULT_TIME_TO_LIVE);

    connection.close();
}

private static MessageProducer createProducer(Session session) throws JMSException {
    Destination producerDestination = 
       session.createQueue("queue?consumer.prefetchSize=1&heartbeat='10000'");
    return session.createProducer(producerDestination);
}

private static Session createSession(Connection connection) throws JMSException {
    return connection.createSession(Session.AUTO_ACKNOWLEDGE);
}

private static Connection createConnection() throws NamingException, JMSException {
    Hashtable<Object, Object> env = new Hashtable<>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    env.put("connectionfactory.factoryLookup", amqUrl);
    Context context = new javax.naming.InitialContext(env);

    ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(connectionFactory);
    pooledConnectionFactory.setMaxConnections(10);

    return connectionFactory.createConnection(amqUsername, amqPassword);
}

这是我的监听器配置

@Bean
public JmsConnectionFactory jmsConnection() {
    JmsConnectionFactory jmsConnection = new JmsConnectionFactory();
    jmsConnection.setRemoteURI(this.amqUrl);
    jmsConnection.setUsername(this.amqUsername);
    jmsConnection.setPassword(this.amqPassword);
    return jmsConnection;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(jmsConnection());

    return factory;
}

这是我的听众

@JmsListener(
        destination = "queue?consumer.prefetchSize=1&heartbeat='10000'",
        selector = "task = 'myTask'"
)
public void receiveMsg(Message message) throws IOException, JMSException {
    message.acknowledge();
    doStuff();
}

我这样发送消息

QpidSender.send(avroMessage, "myTask");

此设置有效。我可以发送不同的消息,一旦消息数量超过 2 个,我的服务的第二个实例就会启动并使用该消息。如果稍后消息计数低于 2,则服务终止。

问题是:我不希望在 doStuff() 之前确认消息。因为如果出现问题或者服务在完成 doStuff() 之前终止,消息就会丢失(对吗?)。

但是如果我将其重新排序为

doStuff();
message.acknowledge();

只要第一个服务仍在

doStuff()
并且尚未确认消息,第二个实例就无法从代理接收消息。

如何以一种方式进行配置,以便多个实例可以使用队列中的消息,但如果服务终止或其他原因失败,消息不会丢失

doStuff()

spring-boot spring-jms activemq-artemis qpid
1个回答
1
投票

使用

factory.setSessionTransacted(true)

请参阅 javadocs

DefaultMessageListenerContainer

 * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
 * javadoc for details on acknowledge modes and native transaction options, as
 * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
 * on configuring an external transaction manager. Note that for the default
 * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
 * before listener execution, with no redelivery in case of an exception.
© www.soinside.com 2019 - 2024. All rights reserved.