在我的应用程序中,我有异步 SQS 消费者,这是来自官方文档的示例:
class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
// Cast the received message as TextMessage and print the text to screen.
System.out.println("Received: " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
这是我如何启动我的消费者的片段:
// Create the connection factory based on the config
SQSConnectionFactory connFactory = new SQSConnectionFactory(new ProviderConfiguration(),
AmazonSQSClientBuilder.defaultClient());
try {
// Create the connection
SQSConnection sqsConn = connFactory.createConnection();
// Create the session
Session session = sqsConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(pdfResponseQueueName));
MyListener sqsMessageListener = new MyListener();
consumer.setMessageListener(sqsMessageListener);
sqsConn.start();
} catch (JMSException e) {
LOG.error("Can't start SQSMessageListener.", e);
}
最近,我注意到我的消费者长时间坐在那里没有处理任何消息(我看到它们堆积在 AWS 仪表板上的队列中),似乎连接中断了,消费者只是默默地闲置。
我仔细阅读了文档,发现 SQS 是 JMS 的包装器。并且,因此它必须实施标准。
我看了
Connection
界面:https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#setExceptionListener-javax.jms.ExceptionListener-
如果 JMS 提供程序检测到连接存在严重问题,它会通知连接的 ExceptionListener(如果已注册的话)。它通过调用侦听器的 onException 方法,将描述问题的 JMSException 对象传递给它来做到这一点。
实现了
Connection
接口,在java文档中有这样一句话:
不支持连接异常侦听器。
但是方法看起来是这样的:
@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosing();
actionOnConnectionTaken = true;
this.exceptionListener = listener;
}
不扔
UnsupportedOperationException
或类似的东西。
那么,在 SQS 异步侦听器中检测连接问题并解决它的正确方法是什么?