Spring cloud SQS - 轮询间隔

问题描述 投票:6回答:1

使用Spring云监听AWS SQS队列,如下所示:

@SqsListener(value = "${queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void queueListener(String message, @Headers Map<String, Object> sqsHeaders) {
    // code
}

Spring配置:

<aws-messaging:annotation-driven-queue-listener
    max-number-of-messages="10" wait-time-out="20" visibility-timeout="3600"
    amazon-sqs="awsSqsClient" />

Aws SqsClient:

@Bean
public com.amazonaws.services.sqs.AmazonSQSAsyncClient awsSqsClient() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    return new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), executorService);
}

这很好用。

在SQS客户端中配置了10个线程来处理这些消息,如上所示。这也工作正常,在任何时间点处理最多10条消息。

问题是,我无法弄清楚控制轮询间隔的方法。默认情况下,一旦所有线程都空闲,弹出轮询。

即考虑以下示例

  1. 大约有3条消息被发送到Queue
  2. Spring轮询队列并获得3条消息
  3. 处理每条消息的3条消息大约需要20分钟左右

与此同时,大约有25条消息被送到队列。在完成之前传递的所有3条消息之前,Spring不会轮询队列。从上面的例子来看,只有20分钟之后春季民意调查,尽管有7个线程仍然免费!

知道我们如何控制这种民意调查吗?即如果有任何线程空闲,则应该开始轮询,并且不应该等到所有线程都空闲

amazon-web-services message-queue aws-sdk amazon-sqs spring-cloud
1个回答
0
投票

您的侦听器可以将消息加载到Spring应用程序中,并将其与Acknowledgment和Visibility对象一起提交到另一个线程池(如果您想要同时控制它们)。将消息提交到此线程池后,侦听器可以加载更多数据。您可以通过调整线程池设置来控制并发性。

您的侦听器的方法签名将类似于以下内容:

@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(YourCustomPOJO pojo,
                   @Headers Map<String, Object> headers,
                   Acknowledgment acknowledgment,
                   Visibility visibility) throws Exception {
...... Send pojo to worker thread and return

然后,工作线程将确认成功处理

acknowledgment.acknowledge().get();

确保将消息可见性设置为大于最高处理时间的值(使用一些超时来限制执行时间)。

© www.soinside.com 2019 - 2024. All rights reserved.