如何使用独占消费者设置Wildfly集群jms(apachemq)队列

问题描述 投票:0回答:1

如何在集群环境中设置 Wildfly jms 消息传递以在同一组中正确传递消息?
我找到了 activeMq 的属性,它解决了这个问题here,但我现在不知道如何将其应用到 wildfly 的 jms 子系统。如何设置队列或Mdb以独占模式工作?

我尝试通过 jms 生产者设置标志“exclusive?=true”,例如:

@Inject
private JMSContext context;

@Resource(mappedName = "java:/jms/queue/TestQueue?consumer.exclusive=true")
private Queue testQueue;

public void pushToQueue(String messagePayload, String messageGroup) {
try {
    Message message = context.createTextMessage(messagePayload);
    message.setStringProperty("JMSXGroupID", messageGroup);
    context.createProducer().send(processQueue, message);
} catch (JMSException e) {
    e.printStackTrace();
}

以及其他方法,例如:

@Resource(name="DefaultJMSConnectionFactory")
private ConnectionFactory connectionFactory;

try {
    Connection connection = connectionFactory.createConnection();
    Session  session = connection.createSession();
    Queue queue = session.createQueue("TestQueue?consumer.exclusive=true");
    Message message = session.createTextMessage(messagePayload);
    session.createProducer(queue).send(message);
} catch (JMSException e) {
    e.printStackTrace();
}

在 Mdb 方面:

@MessageDriven(name = "queueMDB", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "java:/jms/queue/TestQueue?consumer.exclusive=true"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "5"),
})
public class ConsumerMdb implements MessageListener {
    ...
}

一切都没有成功。在这种情况下,我认为字符串 pf 队列名称不会解析为 URl 参数,但整个字符串表示队列名称(或 JNI 名称)

比我尝试使用“ActiveMQQueue”类来创建队列对象(它实现 javax.jms.Destination 接口) 代码:

public void pushToQueue(String messagePayload, String messageGroup) {
    try {
        Connection connection = connectionFactory.createConnection();
        Session  session = connection.createSession();
        ActiveMQQueue queue = new ActiveMQQueue("TestQueue?consumer.exclusive=true");
        //ActiveMQQueue queue = new ActiveMQQueue("jms.queue.TestQueue");
        Message message = session.createTextMessage(messagePayload);
        session.createProducer(queue).send(message);
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

队列名称的不同可能情况:

ActiveMQQueue queue = new ActiveMQQueue("TestQueue");

ActiveMQQueue queue = new ActiveMQQueue("TestQueue?consumer.exclusive=true");

ActiveMQQueue queue = new ActiveMQQueue("java:/jms/queue/TestQueue");

ActiveMQQueue queue = new ActiveMQQueue("jms.queue.TestQueue");

我们也没有成功。我遇到了 InvalidDestinationException:

12:54:12,959 ERROR [stderr] (default task-37) javax.jms.InvalidDestinationException: Not an ActiveMQ Artemis Destination:queue://TestQueue
12:54:12,959 ERROR [stderr] (default task-37)   at org.apache.activemq.artemis.jms.client.ActiveMQSession.createProducer(ActiveMQSession.java:293)
12:54:12,959 ERROR [stderr] (default task-37)   at org.apache.activemq.artemis.ra.ActiveMQRASession.createProducer(ActiveMQRASession.java:1082)
12:54:12,959 ERROR [stderr] (default task-37)   at si.teletech.test.eecluster.ejb.queue.Producer.pushToQueue(Producer.java:67)
12:54:12,960 ERROR [stderr] (default task-37)   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
12:54:12,960 ERROR [stderr] (default task-37)   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
12:54:12,960 ERROR [stderr] (default task-37)   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
12:54:12,960 ERROR [stderr] (default task-37)   at java.lang.reflect.Method.invoke(Method.java:498)
12:54:12,960 ERROR [stderr] (default task-37)   at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52)
12:54:12,960 ERROR [stderr] (default task-37)   at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:422)
12:54:12,960 ERROR [stderr] (default task-37)   at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:509)
12:54:12,960 ERROR [stderr] (default task-37)   at org.jboss.as.weld.interceptors.Jsr299BindingsInterceptor.doMethodInterception(Jsr299BindingsInterceptor.java:90)
...

但这里有例外,我注意到队列名称被正确解析为“Destination:queue://TestQueue”,没有 URL 参数。

java jms wildfly cluster-computing activemq-artemis
1个回答
0
投票

好吧,我自己用某种方式解决了。集群单例交付正是我所需要的。它的工作原理类似于独家消费者。 Mdb bean 部署在集群服务器的所有节点上,但只有一个节点处于活动状态(它消耗消息)。如果节点发生故障,Mdb 将在链中的下一个节点上启动(即故障转移)。

@MessageDriven(name = "queueMDB", activationConfig = {
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "TestQueue"),
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
    @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "5"),
    })
@ClusteredSingleton
public class ConsumerMdb implements MessageListener {
    // your mdb implementation
}

参考:https://docs.jboss.org/author/display/WFLY10/Message+Driven+Beans+Controlled+Delivery

© www.soinside.com 2019 - 2024. All rights reserved.