[使用持久连接监听消息队列时,我在监听器中收到错误。我通过点击CTRL-Z退出程序来模拟这一点。尝试重新连接会给我一个错误,提示:
on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:197)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:70)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:745)
"
我已尝试使用以下方法退订并断开连接,但不会断开连接。
class MyListener(stomp.ConnectionListener):
"""This is a listener class that listens for new messages using the STOMP protocol"""
def __init__(self, conn):
self.conn = conn
self.client_id = client_id
def on_error(self, headers, message):
self.disconnect()
def on_message(self, headers, message):
...
def on_disconnected(self):
self.disconnect()
def disconnect(self):
try:
self.conn.unsubscribe(
destination="/topic/bmrsTopic",
id=self.client_id
)
except:
print('unsubscribe failed')
# first disconnect before trying to reconnect
print('first disconnect before trying to reconnect')
try:
self.conn.disconnect()
except:
print('disconnect failed')
如何强制AMQ服务器忘记以前的连接?
您不能让代理从其他连接断开其他客户端资源。相反,您应该为客户端和代理配置一个具有空闲超时值的连接,以便双方都可以处理远程丢弃,而套接字不会检测到关闭。
您还可以为不发布心跳的踩踏客户端使用心跳宽限期值配置代理传输连接器,请参阅ActiveMQ代理STOMP documentation。>
STOMP specification概述了心跳的工作原理: