我有以下代码从SQS队列中检索消息。我正在使用AmazonSQSBufferedAsyncClient
从队列中检索消息。固定的延迟SingleThreadedExecutor
每5分钟唤醒一次,调用receiveMessage
。在队列中启用了长轮询]
@Service
public class AmazonQueueService
implements QueueService<String> {
@Autowired
private AmazonSQSBufferedAsyncClient sqsAsyncClient;
@Value("${aws.sqs.queueUrl}")
private String queueUrl;
@Override
public List<Message<String>> receiveMessage() {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
ReceiveMessageResult result = sqsAsyncClient.receiveMessage(receiveMessageRequest);
LOG.debug("Size=" + result.getMessages().size());
return Lists.transform(result.getMessages(), ......);
}
.....
}
问题是,当我检查AWS控制台时,该消息始终处于运行状态,但在应用程序中从未收到(大小始终打印为0)。看起来AmazonSQSBufferedAsyncClient
正在从队列中读取消息,但未在receiveMessage
调用中返回。
有什么想法吗?
最后想通了。该问题通过队列可见性超时(2分钟)和ScheduledExecutor延迟(5分钟)的组合来体现。
将可见性超时增加到15分钟解决了问题。
我的理论是-> AmazonSQSBufferedAsyncClient检索消息并将其保留在缓冲区中,以等待receiveMessage调用。由于执行程序延迟为5分钟,因此消息的可见性在调用receiveMessage之前超时,并且消息返回到队列中。看起来该消息几乎是立即从队列中拾取的。现在,无论出于何种原因,对receiveMessage的调用都不会接收到该消息。我认为,增加超时将使receiveMessage调用有机会在超时事件之前发生,从而解决了该问题。
还有其他可能的解释吗?
完成后,您必须从队列中删除消息。如果您不这样做,它将一直处于运行状态,直到超时,然后立即返回队列。它是通过这种方式设计的,因此您永远不会丢失消息。如果您的程序在完成处理消息并删除之前崩溃了,该消息将立即返回队列。
从基本的Java example(SampleDriver.java
):
QMessage message = messages.get(0);
System.out.println("\nMessage received");
System.out.println(" message id: " + message.getId());
System.out.println(" receipt handle: " + message.getReceiptHandle());
System.out.println(" message content: " + message.getContent());
testQueue.deleteMessage(message.getReceiptHandle()); // <===== here
对我来说,问题是甚至在我在控制台上看到它们之前,lambda都已读取我的消息。因此,结果是我无法从sqs控制台轮询消息。