是否可以延长正在传输的消息的可见时间。
参见:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html。
部分:更改消息的可见性超时。
总之,我希望能够延长正在运行的给定消息的第一组可见性超时。
例如,如果 15 秒过去了,我想将超时再延长 20 秒。上面的 java 文档中有更好的示例。
根据我对上面链接的理解,您可以在亚马逊方面执行此操作。
以下是我当前的设置;
SqsMessageDrivenChannelAdapter adapter =
new SqsMessageDrivenChannelAdapter(queue);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
adapter.setMaxNumberOfMessages(1);
adapter.setSendTimeout(2000);
adapter.setVisibilityTimeout(200);
adapter.setWaitTimeOut(20);
是否可以延长此超时时间?
Spring Cloud AWS 从 2.0 版本开始支持此功能。在 SQS 侦听器方法中注入
Visiblity
参数即可解决问题:
@SqsListener(value = "my-sqs-queue")
void onMessageReceived(@Payload String payload, Visibility visibility) {
...
var extension = visibility.extend(20);
...
}
注意,
extend
将异步工作并返回一个Future。因此,如果您想进一步确定处理过程,消息的可见性确实在 AWS 方面得到了扩展,可以使用 extension.get()
阻止 Future 或使用 extension.isDone()
查询 Future
好的。看来我明白你的意思了。
我们可以使用 API 更改特定消息的可见性:
AmazonSQS.changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout)
为此,在下游流程中,您必须访问(注入)
AmazonSQS
bean 并从Message
中提取特殊标头:
@Autowired
AmazonSQS amazonSqs;
@Autowired
ResourceIdResolver resourceIdResolver;
...
MessageHeaders headers = message.getHeaders();
DestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(this.amazonSqs, this.resourceIdResolver);
String queueUrl = destinationResolver.resolveDestination(headers.get(AwsHeaders.QUEUE));
String receiptHandle = headers.get(AwsHeaders.RECEIPT_HANDLE);
amazonSqs.changeMessageVisibility(queueUrl, receiptHandle, YOUR_DESIRED_VISIBILITY_TIMEOUT);
但是,呃,我同意我们应该在这个问题上提供一些开箱即用的功能。这甚至可能类似于
QueueMessageAcknowledgment
作为新标题。或者甚至只是除此之外的另一种 changeMessageVisibility
方法。
请就此事向 Spring Cloud AWS 项目提出 GH 问题,并链接到此 SO 主题。
TLDR:是的,您可以使用 SQS 侦听器方法中的 Visibility 参数更改 Spring Cloud AWS 中 SQS 消息的可见性超时。 可见性扩展方法不会延长默认的可见性超时;它会延长您向 SQS 发送可见性请求的时间。在内部,此可见性方法调用 sqs.changeMessageVisibilityAsync,它将队列中指定消息的可见性超时更改为新值。
示例: 如果您的默认超时为 300 秒,并且您的消息由于 100 秒内的某些暂时性错误而失败,并且您想重试该消息,则可以使用extend(10)。这意味着您的听众将在接下来的 10 秒内再次阅读该消息,而不是达到 300 秒。 从消息侦听器中调用点起,可见性超时会更改 10 秒。
import org.springframework.cloud.aws.messaging.listener.Visibility;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
@Component
public class SqsMessageListener {
@SqsListener(value = "my-sqs-queue")
public void onMessageReceived(@Payload String payload, Acknowledgment acknowledgment, Visibility visibility) {
// Your message processing logic here
try {
// If processing succeeds, acknowledge the message
processMessage(payload);
// Acknowledge the message
acknowledgment.acknowledge();
} catch (Exception e) {
if (e instanceof CustomTransientException) {
// If you need to change visibility, call visibility.extend()
// Example: Changing visibility by 20 seconds from now onwards so this message will be re-consumed from the queue after 20 seconds.
visibility.extend(20);
} else {
acknowledgment.acknowledge();
}
// Handle exceptions
}
}
}
确认可用于在成功处理消息后将其从队列中丢弃。
可见性扩展方法不会延长默认可见性超时;它会延长您向 SQS 发送此请求的时间。在内部,此可见性方法调用 sqs.changeMessageVisibilityAsync,它将队列中指定消息的可见性超时更改为新值。
如果发生异常,不需要等到可见性超时;您可以设置更短的时间间隔来再次检索消息。