我目前正在处理来自远程 SFTP 服务器的 .csv 文件,转换它们并将它们放置在 Kafka 主题上。我知道我们可以在使用
Sftp.inboundAdapter
处理文件后将其删除。我很好奇是否有办法可以通过 Sftp.InboundStreamingAdapter
获得此功能?是否需要添加另一个流程,或者有什么方法可以使用提供的属性删除?
如果需要,我可以提供整个流程的更多代码。 谢谢
@Bean
public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> engageSftpSessionFactory,
IntegrationFlowProperties engageProperties,
MessageChannel inboundFilesMessageChannel) {
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(engageSftpSessionFactory))
.filter(new SftpRegexPatternFileListFilter(engageProperties.getRemoteFilePattern()))
.remoteDirectory(engageProperties.getRemoteDirectory()),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedRate(5000)))
.log(LoggingHandler.Level.DEBUG, "AcousticEngageDataSftpToKafkaIntegrationFlow",
"headers['file_remoteDirectory'] + + T(java.io.File).separator + headers['file_remoteFile']")
.channel(inboundFilesMessageChannel)
.get();
}
AbstractInboundFileSynchronizingMessageSource
分两个阶段工作:将远程文件复制到本地目录,然后为这些本地文件发出消息。 deleteRemoteFiles
选项用于从远程复制到本地阶段。这就是我们不再需要远程文件的地方,我们只处理下游的本地副本。
流媒体源会为远程文件打开一个
InputStream
,并保持这种状态,直到您手动将其关闭。这就是为什么我们没有明确的选项来删除远程文件。
您可以使用
SftpOutboundGateway
和 rm
命令执行远程文件删除。当您将其设置为 inboundFilesMessageChannel
并且您已从消息标头中关闭该 PublishSubscribeChannel
时,此网关可以作为该 IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
的第二个订阅者。
但是我认为我们可以在
removeFileOnClose
上提出类似 AbstractRemoteFileStreamingMessageSource
的选项,并在 rm
关闭时执行 IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
。