在我们的架构中,我们为每个应用程序提供了2个或更多容器,以实现高可用性。我们正在使用activeMQ,我想实现以下行为。
我只希望一个消费者根据可用性处理每条消息。我们的实现是Java。
请分享实施方式。
这是我的代码示例
final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final Destination consumerDestination = consumerSession.createQueue(queueName);
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
// Begin to wait for messages.
Queue queue = consumerSession.createQueue(queueName);
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
//do your things here
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
if (message == null)
continue;
//handle message
System.out.println("Message received in : " + message);
try {
String text = message.getText();
JSONObject messageJson = new JSONObject(text);
consumer.receive(1000);
String responseString = handleMessage(messageJson);
message.acknowledge();
谢谢Moshe
这里的问题是你要确认来自QueueBrowser
实例的消息没有任何影响,因为浏览器只是用于浏览消息而不是消费消息。
...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
...
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
...
try {
...
message.acknowledge(); // this does nothing
你实际上正在创建一个真正的消费和调用consumer.receive(1000)
,但你丢弃了Message
返回的receive()
实例。这是Message
实例,您必须确认实际使用队列中的消息。
...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
...
try {
...
Message actualMessage = consumer.receive(1000); // acknowledge this!
...
另一个重要注意事项......队列浏览器接收的消息不能保证是队列的静态快照。因此,假设您对consumer.receive(1000)
的调用实际上是来自队列浏览器的相同消息是危险的。在我看来,你应该重新设计这个逻辑,只使用MessageConsumer
并放下QueueBrowser
。