使用JMS模板/消息订阅者的Spring中的JMS主题订阅者

问题描述 投票:0回答:1

我有一个简单的Spring应用程序,用于使用ActiveMQ的JMS Producer / Subscriber,配置如下:

应用程序上下文xml:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
    <property name="userName" value="user" />
    <property name="password" value="password" />
</bean>
<bean id="messageDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="messageQueue1" />
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE">
    </property>
</bean>

<bean id="springJmsProducer" class="SpringJmsProducer">
    <property name="destination" ref="messageDestination" />
    <property name="jmsTemplate" ref="jmsTemplate" />
</bean>

<bean id="springJmsConsumer" class="SpringJmsConsumer">
    <property name="destination" ref="messageDestination" />
    <property name="jmsTemplate" ref="jmsTemplate" />
</bean>

以下是Spring制作人

public class SpringJmsProducer {
private JmsTemplate jmsTemplate;
private Destination destination;

public JmsTemplate getJmsTemplate() {
    return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public Destination getDestination() {
    return destination;
}

public void setDestination(Destination destination) {
    this.destination = destination;
}

public void sendMessage(final String msg) {
    jmsTemplate.send(destination, new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(msg);
        }});        
 }
}

以下是春季消费者:

public class SpringJmsConsumer {
private JmsTemplate jmsTemplate;
private Destination destination;

public JmsTemplate getJmsTemplate() {
    return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public Destination getDestination() {
    return destination;
}

public void setDestination(Destination destination) {
    this.destination = destination;
}

public String receiveMessage() throws JMSException {
    TextMessage textMessage =(TextMessage) jmsTemplate.receive(destination);        
    return textMessage.getText();
 }
}

问题:当我启动生产者和发布消息,然后我开始使用消费者时,消费者不会阅读旧消息,而只会阅读消费者启动后发布的消息。任何人都可以帮助我如何使这个持久的订阅者,以便消费者可以读取队列中未被确认的消息,并且我还需要实现Synchronous Consumer not Asynchronous。

我已经尝试了所有可能的解决方案但没有一个正常任何帮助都非常感谢

spring jms activemq jmstemplate
1个回答
1
投票

如果您希望消费者在开始之前收到发送给该主题的消息,您有两个选择:

1.使用Activemq追溯消费者

背景一个追溯消费者只是一个常规的JMS主题消费者,它表示在订阅开始时,应该使用每次尝试来回溯并发送消费者可能具有的任何旧消息(或者在该主题上发送的最后消息)错过。

有关更多详细信息,请参阅订阅恢复策略。

您将消费者标记为追溯,如下所示:

topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");

http://activemq.apache.org/retroactive-consumer.html

2.使用耐用订户:

请注意,持久订阅者在第二次运行开始之前会收到发送到主题的消息

http://activemq.apache.org/manage-durable-subscribers.html

这可以使用DefaultMessageListenerContainer异步进行

<bean id="jmsContainer" destroy-method="shutdown"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="messageDestination" />
    <property name="messageListener" ref="messageListenerAdapter" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
    <property name="subscriptionDurable" value="true" />
    <property name="clientId" value="UniqueClientId" />
</bean>

<bean id="messageListenerAdapter"
    class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="springJmsConsumer" />
</bean>
<bean id="springJmsConsumer" class="SpringJmsConsumer">
</bean>

并更新您的消费者:

public class SpringJmsConsumer implements javax.jms.MessageListener {

    public void onMessage(javax.jms.Message message) {
        // treat message;
        message.acknowledge();
    }
}

更新使用

如果你想要一个Synchronous Durable Subscriber,一个例子

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

public class SpringJmsConsumer {

    private Connection conn;
    private TopicSubscriber topicSubscriber;

    public SpringJmsConsumer(ConnectionFactory connectionFactory, Topic destination ) {
        conn = connectionFactory.createConnection("user", "password");
        Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        topicSubscriber = session.createDurableSubscriber(destination, "UniqueClientId");
        conn.start();
    }

    public String receiveMessage() throws JMSException {
        TextMessage textMessage = (TextMessage) topicSubscriber.receive();
        return textMessage.getText();
    }
}

并更新spring Jms Consumer

<bean id="springJmsConsumer" class="SpringJmsConsumer">
    <constructor-arg ref="connectionFactory" />
    <constructor-arg ref="messageDestination" />
</bean>

请注意,此代码不管理连接失败。

© www.soinside.com 2019 - 2024. All rights reserved.