将ActiveMQ与多个使用者实例一起使用

问题描述 投票:0回答:1

在我们的架构中,我们为每个应用程序提供了2个或更多容器,以实现高可用性。我们正在使用activeMQ,我想实现以下行为。

  1. 推送消息进行排队。
  2. 此消息仅由* one *容器(第一个将读取它)消耗。
  3. 一旦消息成功处理,消费者将更新此消息可以被确认和忽略
  4. 我尝试使用提交和使用CLIENT_ACKNOWLEDGE的事务,但是在这两种情况下,两个消费者都得到了消息并对其进行了处理。

我只希望一个消费者根据可用性处理每条消息。我们的实现是Java。

请分享实施方式。

这是我的代码示例

 final Connection consumerConnection = connectionFactory.createConnection();
    consumerConnection.start();
    // Create a session.
    final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    final Destination consumerDestination = consumerSession.createQueue(queueName);


    // Create a message consumer from the session to the queue.
    final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
    // Begin to wait for messages.
    Queue queue = consumerSession.createQueue(queueName);
    QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
    Enumeration msgs = queueBrowser.getEnumeration();
    while (msgs.hasMoreElements()) {
        //do your things here

        ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();

        if (message == null)
            continue;

        //handle message
        System.out.println("Message received in : " + message);
        try {
            String text = message.getText();
            JSONObject messageJson = new JSONObject(text);
            consumer.receive(1000);

            String responseString = handleMessage(messageJson);

            message.acknowledge();

谢谢Moshe

java activemq
1个回答
1
投票

这里的问题是你要确认来自QueueBrowser实例的消息没有任何影响,因为浏览器只是用于浏览消息而不是消费消息。

...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
    ...
    ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
    ...
    try {
        ...
        message.acknowledge(); // this does nothing

你实际上正在创建一个真正的消费和调用consumer.receive(1000),但你丢弃了Message返回的receive()实例。这是Message实例,您必须确认实际使用队列中的消息。

...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
    ...
    try {
        ...
        Message actualMessage = consumer.receive(1000); // acknowledge this!
        ...

另一个重要注意事项......队列浏览器接收的消息不能保证是队列的静态快照。因此,假设您对consumer.receive(1000)的调用实际上是来自队列浏览器的相同消息是危险的。在我看来,你应该重新设计这个逻辑,只使用MessageConsumer并放下QueueBrowser

© www.soinside.com 2019 - 2024. All rights reserved.