AWS 集成 spring:延长可见性超时

问题描述 投票:0回答:3

是否可以延长正在传输的消息的可见时间。

参见:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html

部分:更改消息的可见性超时。

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQSClient.html#changeMessageVisibility-com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest-

总之,我希望能够延长正在运行的给定消息的第一组可见性超时。

例如,如果 15 秒过去了,我想将超时再延长 20 秒。上面的 java 文档中有更好的示例。

根据我对上面链接的理解,您可以在亚马逊方面执行此操作。

以下是我当前的设置;

  SqsMessageDrivenChannelAdapter adapter =
  new SqsMessageDrivenChannelAdapter(queue);
  adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
  adapter.setMaxNumberOfMessages(1);
  adapter.setSendTimeout(2000);
  adapter.setVisibilityTimeout(200);
  adapter.setWaitTimeOut(20);

是否可以延长此超时时间?

spring spring-integration aws-sdk spring-cloud
3个回答
6
投票

Spring Cloud AWS 从 2.0 版本开始支持此功能。在 SQS 侦听器方法中注入

Visiblity
参数即可解决问题:

  @SqsListener(value = "my-sqs-queue")
  void onMessageReceived(@Payload String payload, Visibility visibility) {
    ...
    var extension = visibility.extend(20);
    ...
  }

注意,

extend
将异步工作并返回一个Future。因此,如果您想进一步确定处理过程,消息的可见性确实在 AWS 方面得到了扩展,可以使用
extension.get()
阻止 Future 或使用
extension.isDone()

查询 Future

5
投票

好的。看来我明白你的意思了。

我们可以使用 API 更改特定消息的可见性:

AmazonSQS.changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout)

为此,在下游流程中,您必须访问(注入)

AmazonSQS
bean 并从
Message
中提取特殊标头:

@Autowired
AmazonSQS amazonSqs;

@Autowired
ResourceIdResolver resourceIdResolver;
...


MessageHeaders headers = message.getHeaders();

DestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(this.amazonSqs, this.resourceIdResolver);

String queueUrl = destinationResolver.resolveDestination(headers.get(AwsHeaders.QUEUE));

String receiptHandle = headers.get(AwsHeaders.RECEIPT_HANDLE);

amazonSqs.changeMessageVisibility(queueUrl, receiptHandle, YOUR_DESIRED_VISIBILITY_TIMEOUT);

但是,呃,我同意我们应该在这个问题上提供一些开箱即用的功能。这甚至可能类似于

QueueMessageAcknowledgment
作为新标题。或者甚至只是除此之外的另一种
changeMessageVisibility
方法。

请就此事向 Spring Cloud AWS 项目提出 GH 问题,并链接到此 SO 主题。


0
投票

TLDR:是的,您可以使用 SQS 侦听器方法中的 Visibility 参数更改 Spring Cloud AWS 中 SQS 消息的可见性超时。 可见性扩展方法不会延长默认的可见性超时;它会延长您向 SQS 发送可见性请求的时间。在内部,此可见性方法调用 sqs.changeMessageVisibilityAsync,它将队列中指定消息的可见性超时更改为新值。

示例: 如果您的默认超时为 300 秒,并且您的消息由于 100 秒内的某些暂时性错误而失败,并且您想重试该消息,则可以使用extend(10)。这意味着您的听众将在接下来的 10 秒内再次阅读该消息,而不是达到 300 秒。 从消息侦听器中调用点起,可见性超时会更改 10 秒。

import org.springframework.cloud.aws.messaging.listener.Visibility;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class SqsMessageListener {

    @SqsListener(value = "my-sqs-queue")
    public void onMessageReceived(@Payload String payload, Acknowledgment acknowledgment, Visibility visibility) {
        // Your message processing logic here
        try {
            // If processing succeeds, acknowledge the message
            processMessage(payload);
            // Acknowledge the message
            acknowledgment.acknowledge();
        } catch (Exception e) {
            if (e instanceof CustomTransientException) {
                // If you need to change visibility, call visibility.extend()
                // Example: Changing visibility by 20 seconds from now onwards so this message will be re-consumed from the queue after 20 seconds.
                visibility.extend(20);
            } else {
                acknowledgment.acknowledge();
            }
            // Handle exceptions
        }
    }
}
  • 确认可用于在成功处理消息后将其从队列中丢弃。

  • 可见性扩展方法不会延长默认可见性超时;它会延长您向 SQS 发送此请求的时间。在内部,此可见性方法调用 sqs.changeMessageVisibilityAsync,它将队列中指定消息的可见性超时更改为新值。

  • 如果发生异常,不需要等到可见性超时;您可以设置更短的时间间隔来再次检索消息。

© www.soinside.com 2019 - 2024. All rights reserved.