transactionSynchronizationFactory 与数据库事务结合不起作用

问题描述 投票:0回答:1

我希望我的代码在远程目录中创建文件后能够基于轮询器接收文件,然后执行数据库插入,然后将文件移动到另一个远程目录。

到目前为止,文件已被轮询并且数据库插入成功,但远程文件已从源目录中删除,并且未移动到远程目录。我无法弄清楚这里出了什么问题。

Spring启动版本:3.2.1 数据库:Postgres

我的Java配置:

  @Bean
  public DefaultFtpsSessionFactory ftpsSessionFactory() {
    DefaultFtpsSessionFactory sessionFactory = new DefaultFtpsSessionFactory();
    sessionFactory.setHost(ftpHost);
    sessionFactory.setUsername(ftpUsername);
    sessionFactory.setPassword(ftpPassword);
    sessionFactory.setPort(ftpPort);
    sessionFactory.setControlEncoding(StandardCharsets.UTF_8.name());
    sessionFactory.setImplicit(true);
    sessionFactory.setProtocol("TLS");
    sessionFactory.setClientMode(FTPSClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
    FTPClientConfig conf = new FTPClientConfig(FTPClientConfig.SYST_UNIX);
    sessionFactory.setConfig(conf);
    return sessionFactory;
  }

  @Bean
  public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
    FtpInboundFileSynchronizer fileSynchronizer =
        new FtpInboundFileSynchronizer(ftpsSessionFactory());

    fileSynchronizer.setDeleteRemoteFiles(true);
    fileSynchronizer.setRemoteDirectory(ftpRemoteDirectoryDownload);
    fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.out"));
    return fileSynchronizer;
  }

  @Bean
  @InboundChannelAdapter(channel = "ftpFileDownloadInboundChannel",
      poller = @Poller(value = "pollerMetadata"))
  public MessageSource<File> ftpMessageSource() {
    FtpInboundFileSynchronizingMessageSource source =
        new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
    source.setLocalDirectory(new File("./downloads/"));
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<>());
    source.setMaxFetchSize(Integer.MIN_VALUE);
    return source;
  }

  @Bean
  public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(ftpPollerDelay)
        .advice(transactionInterceptor())
        .transactionSynchronizationFactory(transactionSynchronizationFactory())
        .getObject();
  }

  @Bean
  public JpaTransactionManager transactionManager() {
    return new JpaTransactionManager();
  }

  @Bean
  public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
        .transactionManager(transactionManager())
        .build();
  }

  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
        new ExpressionEvaluatingTransactionSynchronizationProcessor();
    //move file to archive directory after successful db save transaction
    SpelExpressionParser spelParser = new SpelExpressionParser();
    processor.setAfterCommitExpression(
        spelParser.parseExpression("payload.renameTo(new java.io.File"
            + "(#systemProperties.get('ftp.directory.archive' )  + "
            + "T(java.io.File).separator + payload.name))"));
    return new DefaultTransactionSynchronizationFactory(processor);
  }

  @Bean
  @ServiceActivator(inputChannel = "ftpFileDownloadInboundChannel")
  public MessageHandler ftpInboundMessageHandler() {
    return message -> {
      Object payload = message.getPayload();
      if (payload instanceof File file) {
        log.info("Received fpim po update file {}", file.getName());
        // store PO details in database
        PurchaseOrder purchaseOrder =
            PurchaseOrderMapper.INSTANCE.purchaseOrderNotificationToPurchaseOrderEntity(file);
        purchaseOrderRepository.saveAndFlush(purchaseOrder);
        log.info("Purchase order details saved in db", purchaseOrder);
      } else {
        log.error("Received invalid message payload {}", message.getPayload());
      }
    };
  }

java spring-boot ftps spring-integration-sftp
1个回答
0
投票

我通过更改拦截器 bean 名称解决了这个问题

  @Bean
  public TransactionInterceptor jpaTransactionInterceptor() {
    return new TransactionInterceptorBuilder()
        .transactionManager(transactionManager)
        .build();
  }

 @Bean
  public PollerMetadata pollerMetadata() {

    return Pollers.fixedDelay(ftpPollerDelay)
        .advice(jpaTransactionInterceptor())
        .transactionSynchronizationFactory(transactionSynchronizationFactory())
        .getObject();
  }

并向ExpressionEvaluatingTransactionSynchronizationProcessor提供beanFactory,以避免在TransactionSynchronizationFactory bean创建期间出现关于没有bean工厂的警告消息。

  @Bean
  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
        new ExpressionEvaluatingTransactionSynchronizationProcessor();
    processor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
    //move file to archive directory after successful db save transaction
    SpelExpressionParser spelParser = new SpelExpressionParser();
    processor.setAfterCommitExpression(
        spelParser.parseExpression("T(java.nio.file.Files).move(payload.toPath(), " +
            "T(java.nio.file.Path).of(@environment.getProperty('ftp.directory.archive'), payload.name))"));
    return new DefaultTransactionSynchronizationFactory(processor);
  }
© www.soinside.com 2019 - 2024. All rights reserved.