如何使用同步来保持MessageConsumer在JMS / ActiveMQ中的存活?

问题描述 投票:0回答:1

我在ActiveMQ中有一个MessageProducer和多个MessageConsumer。我希望消费者等到制片人发布一些东西。然后消费者可能被终止。我正在尝试使用Java同步来实现这一点,但它无法正常工作。我确实看到制作人生产了一些东西,但消费者并没有对这个消息做出反应。

这是我的代码:

制片人类:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;

public class TopicProducer extends Thread {
    private final String producerMessage;
    private ActiveMQConnection connection;
    private Session session;
    private Topic topic;
    final private Object lock;

    public TopicProducer(String producerMessage, Session session, Topic topic,
                         final Object lock) {
        this.producerMessage = producerMessage;
        this.session = session;
        this.topic = topic;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (this.lock) {
                Message msg = this.session.createTextMessage(this.producerMessage);
                MessageProducer producer = this.session.createProducer(this.topic);
                System.out.println("TopicProducer: sending text:" + ((TextMessage) msg).getText());
                producer.send(msg);
                System.out.println("after publish");
                this.lock.notifyAll();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

消费者阶层

import javax.jms.*;

public class TopicConsumer extends Thread {
    private Session session;
    private Topic topic;
    private String consumerName;
    final private Object lock;

    public TopicConsumer(Session session, Topic topic, String consumerName,
                         final Object lock) {
        this.session = session;
        this.topic = topic;
        this.consumerName = consumerName;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (this.lock) {
                MessageConsumer consumer = this.session.createConsumer(this.topic);
                consumer.setMessageListener(new ConsumerMessageListener(this.consumerName));
                this.lock.wait();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

连接创建者类:

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.BasicConfigurator;

public class InitConnection {
    public static String QUEUE_NAME = "MyQueue";
    public static String ACTIVEMQ_INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
    public static String ACTIVEMQ_PROVIDER_URL = "tcp://localhost:61616";
    public static String CONN_FACTORY = "ConnectionFactory";
    public static String TOPIC = "someTopic";

    private ActiveMQConnection connection;
    private ActiveMQQueue queue;
    private Session session;
    private Topic topic;

    public InitConnection() {
        try {
            this.init();
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    private void init() throws JMSException, NamingException {
        // Obtain a JNDI connection
        Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY, ACTIVEMQ_INITIAL_CONTEXT_FACTORY);
        props.setProperty(Context.PROVIDER_URL, ACTIVEMQ_PROVIDER_URL);
        InitialContext jndiContext = new InitialContext(props);

        // Look up a JMS connection factory
        ActiveMQConnectionFactory conFactory = (ActiveMQConnectionFactory) jndiContext
                .lookup(CONN_FACTORY);

        // Getting JMS connection from the server and starting it
        this.connection = (ActiveMQConnection) conFactory.createConnection();
        this.connection.start();
        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        this.session = this.connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        this.queue = new ActiveMQQueue(QUEUE_NAME);
        this.topic = session.createTopic(TOPIC);
    }

    public ActiveMQConnection getConnection() {
        return connection;
    }

    public ActiveMQQueue getQueue() {
        return queue;
    }

    public Session getSession() {
        return session;
    }


    public Topic getTopic() {
        return topic;
    }

    private void joinThreads(Thread[] threads) {
        try {
            for (int i = 0; i < threads.length; i++) {
                threads[i].join();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    public static void main(String[] args) {
        BasicConfigurator.configure(); //logs config
        InitConnection conn = new InitConnection();
        final Object lock = new Object();

        TopicProducer tp = new TopicProducer("producerMessage",
                conn.getSession(), conn.getTopic(), lock);
        TopicConsumer tc1 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer1", lock);
        TopicConsumer tc2 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer2", lock);
        TopicConsumer tc3 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer3", lock);

        tc1.start();
        tc2.start();
        tc3.start();
        tp.start();

        try {
            conn.getConnection().close();
        } catch (Exception e) {
            System.out.println(e);
        }

    }
}
java multithreading synchronization jms activemq
1个回答
1
投票

不要使用线程同步 - 这是完全错误的。

您已将consumer作为侦听器实现,这是异步的。而不是使用侦听器,使用接收方法:https://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQMessageConsumer.html#receive--

此方法将阻止,直到您发送消息,之后它将收到此消息并继续。

© www.soinside.com 2019 - 2024. All rights reserved.