ActiveMQ Java STOMP 客户端收到 SocketTimeoutException

问题描述 投票:0回答:2

CentOS 机器上有一个 ActiveMQ 服务器。我可以使用 OpenWire JMS 客户端通过 TCP 和 HTTP 连接和使用消息。但是,当我尝试使用 ActiveMQ 测试 STOMP 客户端时,它会在

connection.receieve
;

上抛出此异常
java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.net.SocketInputStream.read(SocketInputStream.java:224)
    at java.io.DataInputStream.readByte(DataInputStream.java:265)
    at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:174)
    at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:167)
    at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:200)
    at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:112)
    at org.apache.activemq.transport.stomp.StompConnection.receive(StompConnection.java:77)
    at tr.com.estherial.stomplistener.StompListener.main(StompListener.java:25)

听众课

import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
 
public class StompListener {

    public static void main(String[] args) { 

        StompConnection connection = new StompConnection();
        try {
            connection.open("host", 61613);
            connection.connect("admin", "admin", "test");
            connection.subscribe("TEST_TOPIC", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
            connection.begin("test"); 

            while (true) {
                try {
                    StompFrame message = connection.receive(10000); 
                    System.out.println(String.format("%s - Receiver: received '%s'", new Date(), message.getBody()));
                } catch (SocketTimeoutException e) {
                    // ignore
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

这是

activemq.xml
中的连接器:

<transportConnectors>
    <transportConnector name="stomp" uri="stomp://localhost:61613"/>
</transportConnectors>

你以前遇到过类似的异常吗?

java activemq stomp consumer
2个回答
0
投票

当 STOMP 订阅者在指定的超时时间内没有收到消息时,预计会出现

java.net.SocketTimeoutException
。一旦客户创建了他的订阅,你需要send消息到主题。届时,客户应该会收到消息并通过您的
System.out.println
.

打印出来

此外,在 ActiveMQ 5.x 中,当从 STOMP 客户端订阅目的地时,您需要在目的地名称前加上

/queue/
/topic/
。您没有在您的应用程序中这样做。试试这个:

connection.subscribe("/topic/TEST_TOPIC", Stomp.Headers.Subscribe.AckModeValues.CLIENT);

最后,值得注意的是,您正在使用 ActiveMQ 代码库中的 test STOMP 客户端。 ActiveMQ 的内部测试套件使用此客户端来验证代理实现是否按预期工作。它不适合一般用途。此外,如果您使用的是 Java,那么最好使用性能更好、功能更全的客户端,例如 OpenWire JMS 客户端甚至 Qpid JMS 客户端。


0
投票
    import java.net.Socket;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.TimeZone;

import org.apache.activemq.transport.stomp.StompConnection;



public class StompExample 
{
    public static void main(String[] args) throws Exception {


        // Create a connection
        StompConnection connection = new StompConnection();
        
        connection.open("localhost", 61613);
        
        Socket socket = connection.getStompSocket();
        
        HashMap<String, String> headers = new HashMap<>();
        headers.put("login", "abcd");
        headers.put("passcode", "defghij");
        headers.put("heart-beat", "10000,10000");//heart-beat header newly added        
        
        connection.connect(headers);
        
        

        HeartBeatThread heartBeatThread = new HeartBeatThread(socket);
        heartBeatThread.start();
        

        SubscribeThread subscribeThread = new SubscribeThread(connection,"/topic/output"); 
        subscribeThread.start();
        
        heartBeatThread.join();
        subscribeThread.join();
        
        connection.disconnect();
    }
    
    public static String getNowISO() {
        TimeZone tz = TimeZone.getTimeZone("UTC");
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); 
        df.setTimeZone(tz);
        String nowIso = df.format(new Date());
        return nowIso;
    }

}


import java.net.Socket;

public class HeartBeatThread extends Thread{
    
    public Socket socket;

    public HeartBeatThread(Socket socket) {
        this.socket = socket;

    }
    public void run() {
        System.out.println("HeartBeatThread started");
        try {
             
            while(true) {
                
                
                System.out.printf("%s Sending Heartbeat message\r\n",StompExample.getNowISO());
                socket.getOutputStream().write(10);
                
                Thread.sleep(10000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("HeartBeatThread finished");
    }

}

import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;

public class SubscribeThread extends Thread{
    
    public StompConnection connection;
    public String subscribeTopic;

    public SubscribeThread(StompConnection connection,String subscribeTopic) {
        this.connection = connection;
        this.subscribeTopic = subscribeTopic;

    }
    public void run() {
        System.out.println("SubscribeThread started");
        try {
            
            //subscribe to topic
            connection.subscribe(this.subscribeTopic, Stomp.Headers.Subscribe.AckModeValues.AUTO);

             
            while(true) {
                
               StompFrame message = connection.receive(0);
               
               System.out.printf("%s Received message:%s\r\n",StompExample.getNowISO(),message.getBody());
               
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("SubscribeThread finished");
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.