我正在使用 Spring 集成流程从文件服务器读取文件。我的应用程序将在多个实例上运行。我想确保每个实例仅处理任何文件一次。这是我到目前为止使用的:
SFTP 入站通道适配器(基于轮询读取消息)
带有 JdbcMetadataStore(持久性存储)的 SftpPersistentAcceptOnceFileListFilter
Transformer(将 File 类型的消息转换为 JobLaunchRequest)
@ServiceActivator 使用 JobLaunchingGateway 启动 Spring Batch 作业
Spring 文档建议将事务管理器与 JdbcMetadataStore 一起使用,因为它使用事务性 putIfAbsent 查询。
因此,我在 Java DSL Spring 集成流程中提供 PlatformTransactionManager bean。我还在 Spring Batch 步骤(chunk() 方法)中使用相同的事务管理器。
问题:我收到以下错误“在 JobRepository 中检测到现有事务。请修复此问题并重试”
是否有更好的方法来修复/设计此流程或对此错误进行简单的修复?
它可以与 Redis 元数据存储一起使用,因为它是非事务性的。我的总体问题是,使用 JdbcMetadataStore,当我的多个实例在完全相同的秒进行轮询并最终将相同的文件放入数据库的元数据存储表中时,我会遇到独特的约束冲突。
顺便说一句,我正在使用 Postgres db。
有两种选择。
参见
AbstractJobRepositoryFactoryBean
:
/**
* Flag to determine whether to check for an existing transaction when a JobExecution
* is created. Defaults to true because it is usually a mistake, and leads to problems
* with restartability and also to deadlocks in multi-threaded steps.
* @param validateTransactionState the flag to set
*/
public void setValidateTransactionState(boolean validateTransactionState
(但不确定如何通过注释进行设置)。
您可以将输入通道类型从
DirectChannel
更改为 QueueChannel
或 ExecutorChannel
。这样,文件轮询上的事务将在消息到达之前提交JobLaunchingGateway
。