在ACTIVEMQ中使用_AMQ_SCHED_DELIVERY属性来处理预定的消息

问题描述 投票:0回答:1

我试图找出一种延迟消息传递的方法。我遇到了RabbitMQ Delayed Message插件,因为它处于实验阶段,我正在寻找其他选择。

here,我看到我们可以设置_AMQ_SCHED_DELIVERY属性来延迟消息,但这似乎不起作用。

这是生产者和消费者代码(取自here)。

public static class HelloWorldProducer implements Runnable {
    public void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue("TEST.FOO");

            // Create a MessageProducer from the Session to the Topic or Queue
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // Create a messages
            String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);

            // Tell the producer to send the message
            System.out.println("Sent message At: " + new Date(System.currentTimeMillis()));
            message.setLongProperty("_AMQ_SCHED_DELIVERY", System.currentTimeMillis() + 10000);
            producer.send(message);

            // Clean up
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

 public static class HelloWorldConsumer implements Runnable, ExceptionListener {
    public void run() {
        try {

            // Create a ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

            // Create a Connection
            Connection connection = connectionFactory.createConnection();
            connection.start();

            connection.setExceptionListener(this);

            // Create a Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create the destination (Topic or Queue)
            Destination destination = session.createQueue("TEST.FOO");

            // Create a MessageConsumer from the Session to the Topic or Queue
            MessageConsumer consumer = session.createConsumer(destination);

            // Wait for a message
            Message message = consumer.receive(40000);

            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("Received: " + text + " at: " + new Date(System.currentTimeMillis()));
            } else {
                System.out.println("Received: " + message);
            }

            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

但该消息正在立即传递:

Sent message At: Mon Sep 14 17:37:01 IST 2015
Received: Hello world! From: Thread-0 : 746036857 at: Mon Sep 14 17:37:01 IST 2015

从上面的链接可以看出,ActiveMQ支持使用不同的延迟来调度单个消息,但我能够找到一种正确的方法。

此外,对RabbitMQ的延迟消息支持与ActiveMQ支持的比较的任何见解都表示赞赏。

java rabbitmq activemq
1个回答
0
投票

ActiveMQ代理确实支持延迟消息传递但不使用与Rabbit相同的属性,谷歌搜索文档很容易就能显示出来。为了将来参考,此功能的文档在此处:

http://activemq.apache.org/delay-and-schedule-message-delivery.html

© www.soinside.com 2019 - 2024. All rights reserved.