如何在从 Sftp.InboundStreamingAdapter 处理文件后删除文件

问题描述 投票:0回答:1

我目前正在处理来自远程 SFTP 服务器的 .csv 文件,转换它们并将它们放置在 Kafka 主题上。我知道我们可以在使用

Sftp.inboundAdapter
处理文件后将其删除。我很好奇是否有办法可以通过
Sftp.InboundStreamingAdapter
获得此功能?是否需要添加另一个流程,或者有什么方法可以使用提供的属性删除?

如果需要,我可以提供整个流程的更多代码。 谢谢

    @Bean
    public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> engageSftpSessionFactory,
                                                IntegrationFlowProperties engageProperties,
                                                MessageChannel inboundFilesMessageChannel) {

        return IntegrationFlow
                .from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(engageSftpSessionFactory))
                          .filter(new SftpRegexPatternFileListFilter(engageProperties.getRemoteFilePattern()))
                          .remoteDirectory(engageProperties.getRemoteDirectory()),
                      e -> e.id("sftpInboundAdapter")
                            .autoStartup(true)
                            .poller(Pollers.fixedRate(5000)))
                .log(LoggingHandler.Level.DEBUG, "AcousticEngageDataSftpToKafkaIntegrationFlow",
                     "headers['file_remoteDirectory'] + + T(java.io.File).separator  + headers['file_remoteFile']")
                .channel(inboundFilesMessageChannel)
                .get();
    }
spring-integration spring-dsl
1个回答
0
投票

AbstractInboundFileSynchronizingMessageSource
分两个阶段工作:将远程文件复制到本地目录,然后为这些本地文件发出消息。
deleteRemoteFiles
选项用于从远程复制到本地阶段。这就是我们不再需要远程文件的地方,我们只处理下游的本地副本。

流媒体源会为远程文件打开一个

InputStream
,并保持这种状态,直到您手动将其关闭。这就是为什么我们没有明确的选项来删除远程文件。

您可以使用

SftpOutboundGateway
rm
命令执行远程文件删除。当您将其设置为
inboundFilesMessageChannel
并且您已从消息标头中关闭该
PublishSubscribeChannel
时,此网关可以作为该
IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
的第二个订阅者。

但是我认为我们可以在

removeFileOnClose
上提出类似
AbstractRemoteFileStreamingMessageSource
的选项,并在
rm
关闭时执行
IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE

© www.soinside.com 2019 - 2024. All rights reserved.