Spring 将多个事务轮询器与单独的线程池集成

问题描述 投票:0回答:1

我正在使用 Spring Integration 实现一个处理消息/事件的项目,具有以下要求:进入系统的入站消息不应丢失,即使在应用程序崩溃的情况下也是如此。当处理成功完成或失败时,应将其记录下来以供审核。即使在应用程序崩溃的情况下,这些日志也永远不会丢失。它也应该具有合理的性能。

我按以下方式设置我的项目 - 我使用由 PostgreSQL 消息存储支持的三个队列通道。主队列用于存储传入消息,其事务轮询器将启动消息流。使用 RetryAdvice 在流程本身内部处理和重试瞬态错误,因此,如果流程在任何时候抛出异常,则不应回滚事务,而应将消息发送到同样持久的错误通道 Queue。错误通道和成功通道队列也以相同的方式设置事务轮询器。 另外,为了提高性能,每个事务轮询器都设置了固定的线程池。

此设置的问题是当应用程序运行时数据库连接超时:

Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30001ms.
    at ...

轮询器的配置如下:

Pollers.fixedDelay(50).maxMessagesPerPoll(-1)
            .advice(
                TransactionInterceptorBuilder()
                    .transactionManager(transactionManager)
                    .isolation(Isolation.READ_COMMITTED)
                    .propagation(Propagation.REQUIRED)
                    .transactionAttribute(object : DefaultTransactionAttribute() {
                        override fun rollbackOn(ex: Throwable): Boolean {
                            return false
                        }
                    })
                    .build()
            )
            .taskExecutor(Executors.newFixedThreadPool(10))

我尝试使用fixedRate和maxMessagesPerPoll,但这似乎并不能解决问题。当我删除任务执行器时,连接问题消失了,但至少可以说性能并不好。 我怀疑性能问题也与轮询器用来获取阻塞其他线程直到流程完成的新消息的

SELECT FOR UPDATE
查询有关。将轮询器配置为使用类似
SKIP LOCKED
作为解决方法是否可行?

java postgresql spring-integration
1个回答
0
投票

fixedDelay(50)
速度太快,无法与数据库交互。既然你已经
maxMessagesPerPoll(-1)
,就没有理由如此频繁地查看数据库。这么快的速度你真的会耗尽你的连接池,因此你会得到错误。

使用

maxMessagesPerPoll(-1)
PollingConsumer
的默认设置),所有记录均通过一个连接获取。

如果

SKIP LOCKED
适合您,那么确实最好使用它,因为所有正在处理的记录都不会被获取,并且没有事务会等待其
FOR UPDATE
被解锁。

您还可以重新考虑架构,不要针对同一个数据库使用如此多的

QeueueChannel
。您可能只有其中之一,并在一个事务和同一线程中处理数据。
maxMessagesPerPoll
可能是连接池的大小。

TaskExecutor
用于单个轮询周期,所有
maxMessagesPerPoll
都加载在一个线程中,但由于您使用事务,因此可能是一个新连接用于来自池的每条消息。

© www.soinside.com 2019 - 2024. All rights reserved.