我正在开发一个使用activemq来交换消息的应用程序,有些很大,我想取消它。
我们使用activemq故障转移传输与两个ActiveMQ实例(主/从)。代理本身对消息具有100mb帧大小限制。
问题是:如果我尝试发送大于100mb的消息,ActiveMQ服务器将关闭连接。此时,故障转移传输将尝试重新连接并再次发送消息,从而创建无限循环。
客户端记录以下内容:
2017-01-05 09:19:11.910 WARN 14680 --- [0.1:61616@57025] o.a.a.t.failover.FailoverTransport : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {}
java.io.EOFException: null
at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91]
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4]
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4]
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4]
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
2017-01-05 09:19:11.921 INFO 14680 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully reconnected to tcp://localhost:61616
2017-01-05 09:19:11.923 WARN 14680 --- [0.1:61616@57026] o.a.a.t.failover.FailoverTransport : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {}
java.io.EOFException: null
at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91]
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4]
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4]
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4]
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
而activeMQ实例日志:
2017-01-05 09:19:11,909 | WARN | Transport Connection to: tcp://127.0.0.1:57025 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:57025@61616
2017-01-05 09:19:11,922 | WARN | Transport Connection to: tcp://127.0.0.1:57026 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:57026@61616
我试图设置一个TransportListener来验证我是否可以捕获这种情况,但我只是收到一个transportInterupted事件,没有任何分类器。
我阅读了有关故障转移传输(http://activemq.apache.org/failover-transport-reference.html)的文档,也许我可以使用maxReconnectAttempts,但我知道在更常见的情况下我会有几个缺点(比如服务器暂时不可用)。
如何检测这种情况并避免客户端和服务器之间的无限连接循环?
正如你这样说的
maxReconnectAttempts -1 | 0从ActiveMQ 5.6:默认为-1,永远重试。 0表示禁用重新连接,例如:尝试连接一次。在ActiveMQ 5.6之前:默认为0,永远重试。所有ActiveMQ版本:值> 0表示将错误发送回客户端之前的最大重新连接尝试次数。
因此,如果由于消息大小而导致重试失败后,您希望传输侦听器收到传输失败的通知,则需要将maxReconnectAttempts设置为值> 0,此时最大重试次数达到时,将调用传输侦听器的onException。将IOException作为参数,但正如您所说,验证它是否是由于最大大小或其他问题而不容易。
如果要在发送之前检查建议的消息大小,可以在运行时通过jmx访问它来获取在代理端的uri中配置的maxFrameSize并获取BrokerViewMBean实例并调用getTransportConnectorByType方法http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/broker/jmx/BrokerViewMBean.html#line.304这将返回在activemq.xml中配置的uri您可以解析它来检索maxFrameSize。
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://hist:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
ObjectName activeMq = new ObjectName("org.apache.activemq:Type=Broker,BrokerName=localhost");
BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, true);
String uri = mbean.getTransportConnectorByType("tcp");// or ("ssl")
String[] pairs = uri.split("&");
for (String pair : pairs) {
if (pair.startsWith("wireFormat.maxFrameSize")) {
int idx = pair.indexOf("=");
System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
}
}
http://activemq.apache.org/maven/apidocs/org/apache/activemq/broker/jmx/BrokerViewMBean.html#getTransportConnectors--将返回一个传输名称的映射作为键和uri作为值
要获得更好的消息大小,您可以这样做:
OpenWireFormat opf = new OpenWireFormat();
opf.setTightEncodingEnabled(true);
ByteSequence tab = opf.marshal(message);
System.out.println(tab.length);
你的企业必须是这样的:
import java.io.IOException;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.ByteSequence;
public class SimpleSenderMaxSizeManager {
private static Connection conn = null;
private static boolean transportChanged;
private static Long MAX_FRAME_SIZE;
public static void main(String[] args) throws JMSException {
try {
SimpleSenderMaxSizeManager.updateMaxSize("host1");
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
"failover:(tcp://host1:5670,tcp://host2:5671)?randomize=false");
cf.setTransportListener(new TransportListener() {
@Override
public void transportResumed() {
if (transportChanged) {
transportChanged = false;
try {
SimpleSenderMaxSizeManager.updateMaxSize(null);
} catch (Exception e) {
}
}
}
@Override
public void transportInterupted() {
transportChanged = true;
}
@Override
public void onException(IOException error) {
}
@Override
public void onCommand(Object command) {
}
});
conn = cf.createConnection();
ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("TEST"));
conn.start();
ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("test");
OpenWireFormat opf = new OpenWireFormat();
opf.setTightEncodingEnabled(true);
ByteSequence tab = opf.marshal(msg);
System.out.println(tab.length);
if (tab.length >= MAX_FRAME_SIZE) {
throw new RuntimeException(tab.length + ">=" + MAX_FRAME_SIZE);
}
producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}
protected static void updateMaxSize(String host) throws Exception {
JMXConnector jmxc = null;
try {
String jmxHost = host;
String scheme = null;
if (conn == null) {
scheme = "tcp";
} else {
org.apache.activemq.transport.TransportFilter responseCorrelator = (TransportFilter) ((ActiveMQConnection) conn)
.getTransport();
TransportFilter mutexTransport = (TransportFilter) responseCorrelator.getNext();
FailoverTransport failoverTransport = (FailoverTransport) mutexTransport.getNext();
while (failoverTransport.getConnectedTransportURI() == null) {
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
scheme = failoverTransport.getConnectedTransportURI().getScheme();
jmxHost = failoverTransport.getConnectedTransportURI().getHost();
}
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + jmxHost + ":1099/jmxrmi");
Map<String, String[]> env = new HashMap<>();
String[] creds = { "admin", "admin" };
env.put(JMXConnector.CREDENTIALS, creds);
jmxc = JMXConnectorFactory.connect(url, env);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
true);
String value = mbean.getTransportConnectorByType(scheme);
String[] pairs = value.split("&");
for (String pair : pairs) {
if (pair.contains("wireFormat.maxFrameSize")) {
int idx = pair.indexOf("=");
System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
MAX_FRAME_SIZE = Long.valueOf(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
MAX_FRAME_SIZE -= 1000;// security for JMS headers added by
// session on sending
}
}
} finally {
if (jmxc != null) {
try {
jmxc.close();
} catch (Exception e) {
}
}
}
}
}
我不相信这是可能的。您正在尝试对错误处理进行分类,该异常不会通过故障转移:传输。如果超过最大客户端计数,则可能发生相同类型的异常。
在发送声音之前检查消息大小就像一个可行的选项。
尺寸检查是否有理由不符合您的要求?
public String mySendMessage(String body) {
....
if(body.length > MAX_ALLOWED) ..
throw new Exception.. or log.. or other
else
producer.send(session.createTextMessage(body));