我希望我的代码在远程目录中创建文件后能够基于轮询器接收文件,然后执行数据库插入,然后将文件移动到另一个远程目录。
到目前为止,文件已被轮询并且数据库插入成功,但远程文件已从源目录中删除,并且未移动到远程目录。我无法弄清楚这里出了什么问题。
Spring启动版本:3.2.1 数据库:Postgres
我的Java配置:
@Bean
public DefaultFtpsSessionFactory ftpsSessionFactory() {
DefaultFtpsSessionFactory sessionFactory = new DefaultFtpsSessionFactory();
sessionFactory.setHost(ftpHost);
sessionFactory.setUsername(ftpUsername);
sessionFactory.setPassword(ftpPassword);
sessionFactory.setPort(ftpPort);
sessionFactory.setControlEncoding(StandardCharsets.UTF_8.name());
sessionFactory.setImplicit(true);
sessionFactory.setProtocol("TLS");
sessionFactory.setClientMode(FTPSClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
FTPClientConfig conf = new FTPClientConfig(FTPClientConfig.SYST_UNIX);
sessionFactory.setConfig(conf);
return sessionFactory;
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer =
new FtpInboundFileSynchronizer(ftpsSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(ftpRemoteDirectoryDownload);
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.out"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpFileDownloadInboundChannel",
poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("./downloads/"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<>());
source.setMaxFetchSize(Integer.MIN_VALUE);
return source;
}
@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(ftpPollerDelay)
.advice(transactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.getObject();
}
@Bean
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder()
.transactionManager(transactionManager())
.build();
}
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor processor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
//move file to archive directory after successful db save transaction
SpelExpressionParser spelParser = new SpelExpressionParser();
processor.setAfterCommitExpression(
spelParser.parseExpression("payload.renameTo(new java.io.File"
+ "(#systemProperties.get('ftp.directory.archive' ) + "
+ "T(java.io.File).separator + payload.name))"));
return new DefaultTransactionSynchronizationFactory(processor);
}
@Bean
@ServiceActivator(inputChannel = "ftpFileDownloadInboundChannel")
public MessageHandler ftpInboundMessageHandler() {
return message -> {
Object payload = message.getPayload();
if (payload instanceof File file) {
log.info("Received fpim po update file {}", file.getName());
// store PO details in database
PurchaseOrder purchaseOrder =
PurchaseOrderMapper.INSTANCE.purchaseOrderNotificationToPurchaseOrderEntity(file);
purchaseOrderRepository.saveAndFlush(purchaseOrder);
log.info("Purchase order details saved in db", purchaseOrder);
} else {
log.error("Received invalid message payload {}", message.getPayload());
}
};
}
我通过更改拦截器 bean 名称解决了这个问题
@Bean
public TransactionInterceptor jpaTransactionInterceptor() {
return new TransactionInterceptorBuilder()
.transactionManager(transactionManager)
.build();
}
@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(ftpPollerDelay)
.advice(jpaTransactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.getObject();
}
并向ExpressionEvaluatingTransactionSynchronizationProcessor提供beanFactory,以避免在TransactionSynchronizationFactory bean创建期间出现关于没有bean工厂的警告消息。
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor processor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
//move file to archive directory after successful db save transaction
SpelExpressionParser spelParser = new SpelExpressionParser();
processor.setAfterCommitExpression(
spelParser.parseExpression("T(java.nio.file.Files).move(payload.toPath(), " +
"T(java.nio.file.Path).of(@environment.getProperty('ftp.directory.archive'), payload.name))"));
return new DefaultTransactionSynchronizationFactory(processor);
}