在 Java 中,我可以连接到数据推送服务并订阅它。我相信这是一个
stomp
或 OpenWire
协议。重点是,这段 java 代码运行良好(它是其中的一部分):
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
ConnectionFactory factory = new ActiveMQConnectionFactory(API_KEY, API_KEY, "ssl://api.something.com:61616");
Connection connection = factory.createConnection();
connection.setClientID(clientId);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
messageConsumer = session.createDurableSubscriber(topic, subId);
JMSMessageListener listener = new JMSMessageListener();
messageConsumer.setMessageListener(listener);
connection.start();
但是在Python中我无法做到这一点,错误是
stomp.exception.ConnectFailedException
。代码:
import stomp
URL = "ssl://api.something.com"
PORT = 61616
class MyListener(stomp.ConnectionListener):
def on_message(self, headers, message):
#...
def on_error(self, headers, message):
#...
conn = stomp.Connection(host_and_ports=[(URL, PORT)], prefer_localhost=False)
conn.set_listener("", MyListener())
conn.start() # error -> stomp.exception.ConnectFailedException
我做错了什么?正如你所看到的,协议、url 和端口是相同的。
您正在尝试将 STOMP 客户端连接到 OpenWire 端口,这当然会失败,因为它们是两种不同的协议。您需要将 STOMP 客户端连接到 STOMP 传输连接器支持的端口。
<transportConnectors>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
</transportConnectors>
在我的 activemq.xml 配置上:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
它与此 python 代码一起工作作为 send.py:
import stomp
import sys
import time
URL = "localhost"
PORT = 61613
class MyListener(stomp.ConnectionListener):
def on_error(self, headers, message):
print('received an error "%s"' % message)
def on_message(self, headers, message):
print('received a message "%s"' % message)
conn = stomp.Connection(host_and_ports=[(URL, PORT)], prefer_localhost=False)
conn.set_listener("", MyListener())
conn.start()
conn.connect('q_user', 'q_password', wait=True) # change user/password as your system setup
conn.send(body='Test from Python', destination='/queue/SAMPLEQUEUE') # chage Queue name as your system setup
time.sleep(2)
conn.disconnect()
然后运行: $ python send.py
我在尝试连接到('tcp://....', 61616)时遇到了同样的错误。 我刚刚删除了 tcp:// 路径并且它有效
c = stomp.Connection([('v....ru', 61616)], prefer_localhost=False)
c.connect()