我正致力于将Spring Integration与AWS SQS队列集成。
当我使用@ServiceActivator
注释的方法抛出异常时,我遇到了一个问题。似乎在这种情况下,消息无论如何都会从队列中删除。我已经在MessageDeletionPolicy
中将ON_SUCCESS
配置为SqsMessageDrivenChannelAdapter
。
我尝试使用@SqsListener
注释做同样的事情,并且不会按预期删除消息。
我在这里创建了一个迷你Spring Boot应用程序来演示这个问题:https://github.com/sdusza1/spring-integration-sqs
请帮忙 :)
您的配置如下:
@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME);
adapter.setOutputChannel(inboundChannel());
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER);
return adapter;
}
inboundChannel
是这样的:
@Bean
public QueueChannel inboundChannel() {
return new QueueChannel();
}
所以,这是一个队列,因此async和来自该队列的消息由TaskScheduler
在一个单独的线程上处理,PollerMetadata
根据你的SqsMessageDrivenChannelAdapter
配置轮询这种类型的通道。在这种情况下,消费者中的任何错误也会被抛入该线程,并且不会达到预期的错误处理的@SqsListener
。
这在技术上完全不同于你直接在容器线程上调用的QueueChannel
体验,因此应用了它的错误处理。
或者您需要修改逻辑,以便在单独的线程中处理错误,或者只是在SqsMessageDrivenChannelAdapter
之后不使用@SqsListener
,并让它在qazxswpoi的情况下抛出并处理底层SQS侦听器容器中的错误。