如何在集群环境中设置 Wildfly jms 消息传递以在同一组中正确传递消息?
我找到了 activeMq 的属性,它解决了这个问题here,但我现在不知道如何将其应用到 wildfly 的 jms 子系统。如何设置队列或Mdb以独占模式工作?
我尝试通过 jms 生产者设置标志“exclusive?=true”,例如:
@Inject
private JMSContext context;
@Resource(mappedName = "java:/jms/queue/TestQueue?consumer.exclusive=true")
private Queue testQueue;
public void pushToQueue(String messagePayload, String messageGroup) {
try {
Message message = context.createTextMessage(messagePayload);
message.setStringProperty("JMSXGroupID", messageGroup);
context.createProducer().send(processQueue, message);
} catch (JMSException e) {
e.printStackTrace();
}
以及其他方法,例如:
@Resource(name="DefaultJMSConnectionFactory")
private ConnectionFactory connectionFactory;
try {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
Queue queue = session.createQueue("TestQueue?consumer.exclusive=true");
Message message = session.createTextMessage(messagePayload);
session.createProducer(queue).send(message);
} catch (JMSException e) {
e.printStackTrace();
}
在 Mdb 方面:
@MessageDriven(name = "queueMDB", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "java:/jms/queue/TestQueue?consumer.exclusive=true"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "5"),
})
public class ConsumerMdb implements MessageListener {
...
}
一切都没有成功。在这种情况下,我认为字符串 pf 队列名称不会解析为 URl 参数,但整个字符串表示队列名称(或 JNI 名称)
比我尝试使用“ActiveMQQueue”类来创建队列对象(它实现 javax.jms.Destination 接口) 代码:
public void pushToQueue(String messagePayload, String messageGroup) {
try {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
ActiveMQQueue queue = new ActiveMQQueue("TestQueue?consumer.exclusive=true");
//ActiveMQQueue queue = new ActiveMQQueue("jms.queue.TestQueue");
Message message = session.createTextMessage(messagePayload);
session.createProducer(queue).send(message);
} catch (JMSException e) {
e.printStackTrace();
}
}
队列名称的不同可能情况:
ActiveMQQueue queue = new ActiveMQQueue("TestQueue");
ActiveMQQueue queue = new ActiveMQQueue("TestQueue?consumer.exclusive=true");
ActiveMQQueue queue = new ActiveMQQueue("java:/jms/queue/TestQueue");
ActiveMQQueue queue = new ActiveMQQueue("jms.queue.TestQueue");
我们也没有成功。我遇到了 InvalidDestinationException:
12:54:12,959 ERROR [stderr] (default task-37) javax.jms.InvalidDestinationException: Not an ActiveMQ Artemis Destination:queue://TestQueue
12:54:12,959 ERROR [stderr] (default task-37) at org.apache.activemq.artemis.jms.client.ActiveMQSession.createProducer(ActiveMQSession.java:293)
12:54:12,959 ERROR [stderr] (default task-37) at org.apache.activemq.artemis.ra.ActiveMQRASession.createProducer(ActiveMQRASession.java:1082)
12:54:12,959 ERROR [stderr] (default task-37) at si.teletech.test.eecluster.ejb.queue.Producer.pushToQueue(Producer.java:67)
12:54:12,960 ERROR [stderr] (default task-37) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
12:54:12,960 ERROR [stderr] (default task-37) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
12:54:12,960 ERROR [stderr] (default task-37) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
12:54:12,960 ERROR [stderr] (default task-37) at java.lang.reflect.Method.invoke(Method.java:498)
12:54:12,960 ERROR [stderr] (default task-37) at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52)
12:54:12,960 ERROR [stderr] (default task-37) at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:422)
12:54:12,960 ERROR [stderr] (default task-37) at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:509)
12:54:12,960 ERROR [stderr] (default task-37) at org.jboss.as.weld.interceptors.Jsr299BindingsInterceptor.doMethodInterception(Jsr299BindingsInterceptor.java:90)
...
但这里有例外,我注意到队列名称被正确解析为“Destination:queue://TestQueue”,没有 URL 参数。
好吧,我自己用某种方式解决了。集群单例交付正是我所需要的。它的工作原理类似于独家消费者。 Mdb bean 部署在集群服务器的所有节点上,但只有一个节点处于活动状态(它消耗消息)。如果节点发生故障,Mdb 将在链中的下一个节点上启动(即故障转移)。
@MessageDriven(name = "queueMDB", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "TestQueue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "5"),
})
@ClusteredSingleton
public class ConsumerMdb implements MessageListener {
// your mdb implementation
}
参考:https://docs.jboss.org/author/display/WFLY10/Message+Driven+Beans+Controlled+Delivery