SQS 异步消费者,连接问题

问题描述 投票:0回答:0

在我的应用程序中,我有异步 SQS 消费者,这是来自官方文档的示例:

class MyListener implements MessageListener {
 
    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            System.out.println("Received: " + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

这是我如何启动我的消费者的片段:

        // Create the connection factory based on the config
        SQSConnectionFactory connFactory = new SQSConnectionFactory(new ProviderConfiguration(),
                AmazonSQSClientBuilder.defaultClient());
        try {
            // Create the connection
            SQSConnection sqsConn = connFactory.createConnection();

            // Create the session
            Session session = sqsConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            MessageConsumer consumer = session.createConsumer(session.createQueue(pdfResponseQueueName));

            MyListener sqsMessageListener = new MyListener();
            consumer.setMessageListener(sqsMessageListener);

            sqsConn.start();
        } catch (JMSException e) {
            LOG.error("Can't start SQSMessageListener.", e);
        }

最近,我注意到我的消费者长时间坐在那里没有处理任何消息(我看到它们堆积在 AWS 仪表板上的队列中),似乎连接中断了,消费者只是默默地闲置。

我仔细阅读了文档,发现 SQS 是 JMS 的包装器。并且,因此它必须实施标准。

我看了

Connection
界面:https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#setExceptionListener-javax.jms.ExceptionListener-

如果 JMS 提供程序检测到连接存在严重问题,它会通知连接的 ExceptionListener(如果已注册的话)。它通过调用侦听器的 onException 方法,将描述问题的 JMSException 对象传递给它来做到这一点。

然而,当我看着 https://github.com/awslabs/amazon-sqs-java-messaging-lib/blob/master/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java

实现了

Connection
接口,在java文档中有这样一句话:

不支持连接异常侦听器。

但是方法看起来是这样的:


    @Override
    public void setExceptionListener(ExceptionListener listener) throws JMSException {
        checkClosing();
        actionOnConnectionTaken = true;
        this.exceptionListener = listener;
    }

不扔

UnsupportedOperationException
或类似的东西。

那么,在 SQS 异步侦听器中检测连接问题并解决它的正确方法是什么?

amazon-sqs
© www.soinside.com 2019 - 2024. All rights reserved.