ActiveMQ忽略优先级设置

问题描述 投票:0回答:1

使用此代码:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class CompositeQueuePriority {
  public static void main(String[] args) throws Exception {
    String brokerUrl = "tcp://localhost:61616";

    BrokerService broker = new BrokerService();
    broker.addConnector(brokerUrl);
    broker.setPersistent(false);
    broker.setDestinationPolicy(policyMap());
    broker.start();

    Destination a = ActiveMQDestination.createDestination("queue", ActiveMQDestination.QUEUE_TYPE);

    Session session = createSession();

    MessageProducer lowProducer = session.createProducer(a);
    lowProducer.setPriority(1);

    MessageProducer highProducer = session.createProducer(a);
    highProducer.setPriority(9);

    MessageConsumer consumer = session.createConsumer(a);

    for (int i = 0; i < 10; i++) {
      lowProducer.send(session.createTextMessage("Low"));
      highProducer.send(session.createTextMessage("High"));

      String first = ((TextMessage) consumer.receive()).getText();
      String second = ((TextMessage) consumer.receive()).getText();

      System.out.println(first + ", " + second);
    }

    broker.stop();
  }

  private static Session createSession() throws JMSException {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  }

  private static PolicyMap policyMap() {
    PolicyMap policyMap = new PolicyMap();
    policyMap.setDefaultEntry(prioPolicyEntry());
    return policyMap;
  }

  private static PolicyEntry prioPolicyEntry() {
    PolicyEntry policyEntry = new PolicyEntry();
    policyEntry.setPrioritizedMessages(true);
    return policyEntry;
  }
}

输出为:

Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High

根据tdocumentation,自5.4开始支持优先级,我使用的是5.15。我做错什么了吗?

java activemq
1个回答
2
投票

我相信您的问题是,当您发送消息时,已经创建了消费者,这意味着一旦经纪人收到消息,消息将立即发送给消费者,因此消息将没有机会被收件人使用。优先级。

首先发送所有消息,然后创建您的使用者。

© www.soinside.com 2019 - 2024. All rights reserved.