我有一个
StandardIntegrationFlow
,它从 SFTP 服务器上的一堆文件夹中读取一些文件,进行过滤,然后处理通过过滤器的文件。我的文件结构是这样的:/home/{userId}/files/...
@Bean
public StandardIntegrationFlow readingFilesAndProcess(
SessionFactory<ChannelSftp.LsEntry> sessionFactory,
PollerMetadata sftpStreamingPoller,
RecursiveSftpRemoteFileTemplate recursiveReceiptsRemoteFileTemplate,
FileeValidator fileValidator) {
return IntegrationFlows.from(
Sftp.inboundStreamingAdapter(recursiveReceiptsRemoteFileTemplate)
.remoteDirectory("/home"),
e -> e.poller(sftpStreamingPoller).autoStartup(false))
.filter(Message.class, fileValidator::isFileValid)
.handle(
message -> {
log.info("Handling message at: {}", LocalDateTime.now());
handlingFile(message);
})
.get();
}
我创建了一个递归 RemoteFileTemplate,我可以成功解析所有用户的所有文件。
@Bean
public RecursiveSftpRemoteFileTemplate recursiveReceiptsRemoteFileTemplate(
SessionFactory<ChannelSftp.LsEntry> sessionFactory,
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter) {
final RecursiveSftpRemoteFileTemplate template =
new RecursiveSftpRemoteFileTemplate(
sessionFactory,
1,
file -> sftpPersistentAcceptOnceFileListFilter.accept(file),
List.of("files"));
template.setRemoteDirectoryExpression(new LiteralExpression("/home"));
return template;
}
为了触发集成流程,我正在使用此函数:
@Scheduled(cron = "0 0 0 * * *", zone = "ECT")
public void triggeringFlow(){
log.info("Triggering the IntegrationFlow");
readingFilesAndProcess.start();
}
我的问题是,第一次触发readingFilesAndProcess 流程时会解析所有文件,但第二次、第三次......时它不会执行任何操作。 (检查日志)。
有没有办法在解析所有文件时“停止”流程,以便下次触发流程时它将再次遍历所有文件?
您不需要那种
@Scheduled
方法,也不需要停止或重新启动您的入站通道适配器。只需配置 poller
即可让相应的 cron 表达式每天运行一次。